diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml index c500835d818e..793e9e54b179 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml @@ -26,7 +26,7 @@ data: alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 716ca874-520b-4902-9f80-9fad66754b89 - dockerImageTag: 0.2.8 + dockerImageTag: 0.2.9 dockerRepository: airbyte/destination-s3-data-lake documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake githubIssueLabel: destination-s3-data-lake diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Checker.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeChecker.kt similarity index 59% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Checker.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeChecker.kt index aed868463982..3b51afd8fab2 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeChecker.kt @@ -6,25 +6,25 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.load.check.DestinationChecker import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableCleaner -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import javax.inject.Singleton import org.apache.iceberg.Schema import org.apache.iceberg.types.Types @Singleton -class IcebergV2Checker( - private val icebergTableCleaner: IcebergTableCleaner, - private val icebergUtil: IcebergUtil, +class S3DataLakeChecker( + private val s3DataLakeTableCleaner: S3DataLakeTableCleaner, + private val s3DataLakeUtil: S3DataLakeUtil, private val tableIdGenerator: TableIdGenerator, -) : DestinationChecker { +) : DestinationChecker { - override fun check(config: IcebergV2Configuration) { + override fun check(config: S3DataLakeConfiguration) { catalogValidation(config) } - private fun catalogValidation(config: IcebergV2Configuration) { - val catalogProperties = icebergUtil.toCatalogProperties(config) - val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties) + private fun catalogValidation(config: S3DataLakeConfiguration) { + val catalogProperties = s3DataLakeUtil.toCatalogProperties(config) + val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties) val testTableIdentifier = DestinationStream.Descriptor(TEST_NAMESPACE, TEST_TABLE) @@ -34,14 +34,14 @@ class IcebergV2Checker( Types.NestedField.optional(2, "data", Types.StringType.get()), ) val table = - icebergUtil.createTable( + s3DataLakeUtil.createTable( testTableIdentifier, catalog, testTableSchema, catalogProperties, ) - icebergTableCleaner.clearTable( + s3DataLakeTableCleaner.clearTable( catalog, tableIdGenerator.toTableIdentifier(testTableIdentifier), table.io(), diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Configuration.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeConfiguration.kt similarity index 78% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Configuration.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeConfiguration.kt index 1da1303a93c7..f704391ac47c 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Configuration.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeConfiguration.kt @@ -20,7 +20,7 @@ const val DEFAULT_STAGING_BRANCH = "airbyte_staging" const val TEST_NAMESPACE = "airbyte_test_namespace" const val TEST_TABLE = "airbyte_test_table" -data class IcebergV2Configuration( +data class S3DataLakeConfiguration( override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration, override val s3BucketConfiguration: S3BucketConfiguration, override val icebergCatalogConfiguration: IcebergCatalogConfiguration @@ -31,12 +31,12 @@ data class IcebergV2Configuration( S3BucketConfigurationProvider @Singleton -class IcebergV2ConfigurationFactory : - DestinationConfigurationFactory { +class S3DataLakeConfigurationFactory : + DestinationConfigurationFactory { override fun makeWithoutExceptionHandling( - pojo: IcebergV2Specification - ): IcebergV2Configuration { - return IcebergV2Configuration( + pojo: S3DataLakeSpecification + ): S3DataLakeConfiguration { + return S3DataLakeConfiguration( awsAccessKeyConfiguration = pojo.toAWSAccessKeyConfiguration(), s3BucketConfiguration = pojo.toS3BucketConfiguration(), icebergCatalogConfiguration = pojo.toIcebergCatalogConfiguration(), @@ -45,9 +45,9 @@ class IcebergV2ConfigurationFactory : } @Factory -class IcebergV2ConfigurationProvider(private val config: DestinationConfiguration) { +class S3DataLakeConfigurationProvider(private val config: DestinationConfiguration) { @Singleton - fun get(): IcebergV2Configuration { - return config as IcebergV2Configuration + fun get(): S3DataLakeConfiguration { + return config as S3DataLakeConfiguration } } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergDestination.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestination.kt similarity index 100% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergDestination.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestination.kt diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Specification.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecification.kt similarity index 96% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Specification.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecification.kt index 701107af4634..f78e35de942c 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Specification.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecification.kt @@ -20,7 +20,7 @@ import jakarta.inject.Singleton @Singleton @JsonSchemaTitle("Iceberg V2 Destination Specification") -class IcebergV2Specification : +class S3DataLakeSpecification : ConfigurationSpecification(), AWSAccessKeySpecification, S3BucketSpecification, @@ -61,7 +61,7 @@ class IcebergV2Specification : } @Singleton -class IcebergV2SpecificationExtension : DestinationSpecificationExtension { +class S3DataLakeSpecificationExtension : DestinationSpecificationExtension { override val supportedSyncModes = listOf( DestinationSyncMode.OVERWRITE, diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt similarity index 83% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergStreamLoader.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt index 38840d29c1c6..d44bc073fdaf 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt @@ -12,18 +12,18 @@ import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.write.StreamLoader -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableCleaner -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableWriterFactory -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.iceberg.Table @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") -class IcebergStreamLoader( +class S3DataLakeStreamLoader( override val stream: DestinationStream, private val table: Table, - private val icebergTableWriterFactory: IcebergTableWriterFactory, - private val icebergUtil: IcebergUtil, + private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory, + private val s3DataLakeUtil: S3DataLakeUtil, private val pipeline: MapperPipeline, private val stagingBranchName: String, private val mainBranchName: String @@ -35,17 +35,17 @@ class IcebergStreamLoader( totalSizeBytes: Long, endOfStream: Boolean ): Batch { - icebergTableWriterFactory + s3DataLakeTableWriterFactory .create( table = table, - generationId = icebergUtil.constructGenerationIdSuffix(stream), + generationId = s3DataLakeUtil.constructGenerationIdSuffix(stream), importType = stream.importType ) .use { writer -> log.info { "Writing records to branch $stagingBranchName" } records.forEach { record -> val icebergRecord = - icebergUtil.toRecord( + s3DataLakeUtil.toRecord( record = record, stream = stream, tableSchema = table.schema(), @@ -84,10 +84,10 @@ class IcebergStreamLoader( } val generationIdsToDelete = (0 until stream.minimumGenerationId).map( - icebergUtil::constructGenerationIdSuffix + s3DataLakeUtil::constructGenerationIdSuffix ) - val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil) - icebergTableCleaner.deleteGenerationId( + val s3DataLakeTableCleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) + s3DataLakeTableCleaner.deleteGenerationId( table, stagingBranchName, generationIdsToDelete diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Writer.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt similarity index 80% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Writer.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt index e4f82a0f5664..e5d3d832dd27 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt @@ -8,25 +8,25 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableWriterFactory -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import javax.inject.Singleton import org.apache.iceberg.Schema @Singleton -class IcebergV2Writer( - private val icebergTableWriterFactory: IcebergTableWriterFactory, - private val icebergConfiguration: IcebergV2Configuration, - private val icebergUtil: IcebergUtil +class S3DataLakeWriter( + private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory, + private val icebergConfiguration: S3DataLakeConfiguration, + private val s3DataLakeUtil: S3DataLakeUtil ) : DestinationWriter { override fun createStreamLoader(stream: DestinationStream): StreamLoader { - val properties = icebergUtil.toCatalogProperties(config = icebergConfiguration) - val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties) + val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration) + val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties) val pipeline = IcebergParquetPipelineFactory().create(stream) - val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + val schema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline) val table = - icebergUtil.createTable( + s3DataLakeUtil.createTable( streamDescriptor = stream.descriptor, catalog = catalog, schema = schema, @@ -35,11 +35,11 @@ class IcebergV2Writer( existingAndIncomingSchemaShouldBeSame(catalogSchema = schema, tableSchema = table.schema()) - return IcebergStreamLoader( + return S3DataLakeStreamLoader( stream = stream, table = table, - icebergTableWriterFactory = icebergTableWriterFactory, - icebergUtil = icebergUtil, + s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory, + s3DataLakeUtil = s3DataLakeUtil, pipeline = pipeline, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = icebergConfiguration.icebergCatalogConfiguration.mainBranchName, diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt index 30e27df54f30..d83db691315e 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt @@ -32,7 +32,7 @@ class GlueTableIdGenerator : TableIdGenerator { } @Factory -class TableIdGeneratorFactory(private val icebergConfiguration: IcebergV2Configuration) { +class TableIdGeneratorFactory(private val icebergConfiguration: S3DataLakeConfiguration) { @Singleton fun create() = when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) { diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableCleaner.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleaner.kt similarity index 93% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableCleaner.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleaner.kt index 4507f280f763..2513712f7a04 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableCleaner.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleaner.kt @@ -16,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations * catalog implementations do not clear the underlying files written to table storage. */ @Singleton -class IcebergTableCleaner(private val icebergUtil: IcebergUtil) { +class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) { /** * Clears the table identified by the provided [TableIdentifier]. This removes all data and @@ -49,7 +49,7 @@ class IcebergTableCleaner(private val icebergUtil: IcebergUtil) { val genIdsToDelete = generationIdSuffix .filter { - icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it) + s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(it) true } .toSet() diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableWriterFactory.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactory.kt similarity index 97% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableWriterFactory.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactory.kt index 8bfe0a333197..1d704b7c3311 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableWriterFactory.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactory.kt @@ -29,7 +29,7 @@ import org.apache.iceberg.util.PropertyUtil * and whether primary keys are configured on the destination table's schema. */ @Singleton -class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) { +class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) { /** * Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table]. * @@ -39,7 +39,7 @@ class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) { * @return The [BaseTaskWriter] that writes records to the target [Table]. */ fun create(table: Table, generationId: String, importType: ImportType): BaseTaskWriter { - icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId) + s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationId) val format = FileFormat.valueOf( table diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtil.kt similarity index 97% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergUtil.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtil.kt index e8901207844e..4e694978c11d 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtil.kt @@ -20,7 +20,7 @@ import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.integrations.destination.s3_data_lake.ACCESS_KEY_ID import io.airbyte.integrations.destination.s3_data_lake.GlueCredentialsProvider -import io.airbyte.integrations.destination.s3_data_lake.IcebergV2Configuration +import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeConfiguration import io.airbyte.integrations.destination.s3_data_lake.SECRET_ACCESS_KEY import io.airbyte.integrations.destination.s3_data_lake.TableIdGenerator import io.github.oshai.kotlinlogging.KotlinLogging @@ -68,7 +68,7 @@ data class AWSSystemCredentials( * will be removed when we change all of this to use Micronaut */ @Singleton -class IcebergUtil( +class S3DataLakeUtil( private val tableIdGenerator: TableIdGenerator, val awsSystemCredentials: AWSSystemCredentials? = null ) { @@ -202,7 +202,7 @@ class IcebergUtil( * @param config The destination's configuration * @return The Iceberg [Catalog] configuration properties. */ - fun toCatalogProperties(config: IcebergV2Configuration): Map { + fun toCatalogProperties(config: S3DataLakeConfiguration): Map { val icebergCatalogConfig = config.icebergCatalogConfiguration val catalogConfig = icebergCatalogConfig.catalogConfiguration val region = config.s3BucketConfiguration.s3BucketRegion.region @@ -226,7 +226,7 @@ class IcebergUtil( } private fun buildS3Properties( - config: IcebergV2Configuration, + config: S3DataLakeConfiguration, icebergCatalogConfig: IcebergCatalogConfiguration ): Map { return buildMap { @@ -242,7 +242,7 @@ class IcebergUtil( } private fun buildNessieProperties( - config: IcebergV2Configuration, + config: S3DataLakeConfiguration, catalogConfig: NessieCatalogConfiguration, s3Properties: Map ): Map { @@ -276,7 +276,7 @@ class IcebergUtil( } private fun buildGlueProperties( - config: IcebergV2Configuration, + config: S3DataLakeConfiguration, catalogConfig: GlueCatalogConfiguration, icebergCatalogConfig: IcebergCatalogConfiguration ): Map { @@ -302,7 +302,7 @@ class IcebergUtil( private fun buildRoleBasedClientProperties( roleArn: String, - config: IcebergV2Configuration + config: S3DataLakeConfiguration ): Map { val region = config.s3BucketConfiguration.s3BucketRegion.region val (accessKeyId, secretAccessKey, externalId) = @@ -339,7 +339,9 @@ class IcebergUtil( ) } - private fun buildKeyBasedClientProperties(config: IcebergV2Configuration): Map { + private fun buildKeyBasedClientProperties( + config: S3DataLakeConfiguration + ): Map { val awsAccessKeyId = requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) { "AWS Access Key ID is required for key-based authentication" diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2CheckTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeCheckTest.kt similarity index 74% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2CheckTest.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeCheckTest.kt index 080ad38bdb0d..5ae4c1e2986e 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeCheckTest.kt @@ -6,10 +6,10 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.load.check.CheckIntegrationTest import io.airbyte.cdk.load.check.CheckTestConfig -import io.airbyte.integrations.destination.s3_data_lake.IcebergV2TestUtil.GLUE_CONFIG_PATH +import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTestUtil.GLUE_CONFIG_PATH -class IcebergV2CheckTest : - CheckIntegrationTest( +class S3DataLakeCheckTest : + CheckIntegrationTest( successConfigFilenames = listOf( CheckTestConfig(GLUE_CONFIG_PATH), diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDataDumper.kt similarity index 95% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2DataDumper.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDataDumper.kt index ffe5df693df6..1a3dc60c9376 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDataDumper.kt @@ -17,7 +17,7 @@ import java.util.UUID import org.apache.iceberg.data.IcebergGenerics import org.apache.iceberg.data.Record -object IcebergV2DataDumper : DestinationDataDumper { +object S3DataLakeDataDumper : DestinationDataDumper { private fun toAirbyteValue(record: Record): ObjectValue { return ObjectValue( @@ -71,9 +71,9 @@ object IcebergV2DataDumper : DestinationDataDumper { spec: ConfigurationSpecification, stream: DestinationStream ): List { - val config = IcebergV2TestUtil.getConfig(spec) + val config = S3DataLakeTestUtil.getConfig(spec) val catalog = - IcebergV2TestUtil.getCatalog(config, IcebergV2TestUtil.getAWSSystemCredentials()) + S3DataLakeTestUtil.getCatalog(config, S3DataLakeTestUtil.getAWSSystemCredentials()) val table = catalog.loadTable( TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergDestinationCleaner.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt similarity index 83% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergDestinationCleaner.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt index 209801c44409..9854ddd24fef 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt @@ -7,8 +7,8 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableCleaner -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces @@ -22,7 +22,7 @@ class S3DataLakeDestinationCleaner(private val catalog: Catalog) : DestinationCl } // we're passing explicit TableIdentifier to clearTable, so just use SimpleTableIdGenerator - val tableCleaner = IcebergTableCleaner(IcebergUtil(SimpleTableIdGenerator())) + val tableCleaner = S3DataLakeTableCleaner(S3DataLakeUtil(SimpleTableIdGenerator())) namespaces.forEach { namespace -> catalog.listTables(namespace).forEach { tableId -> diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergExpectedRecordMapper.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeExpectedRecordMapper.kt similarity index 95% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergExpectedRecordMapper.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeExpectedRecordMapper.kt index 833d6adac735..b86c45a624a6 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergExpectedRecordMapper.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeExpectedRecordMapper.kt @@ -17,7 +17,7 @@ import io.airbyte.cdk.load.test.util.OutputRecord * Iceberg doesn't have a TimeWithTimezone type. So map expectedRecords' TimeWithTimezone to * TimeWithoutTimezone. */ -object IcebergExpectedRecordMapper : ExpectedRecordMapper { +object S3DataLakeExpectedRecordMapper : ExpectedRecordMapper { override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord { val mappedData = mapTimeTzToTimeNtz(expectedRecord.data) return expectedRecord.copy(data = mappedData as ObjectValue) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2SpecTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecTest.kt similarity index 81% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2SpecTest.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecTest.kt index 91b1c641ce1f..171a4ec3b641 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2SpecTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecTest.kt @@ -6,4 +6,4 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.load.spec.SpecTest -class IcebergV2SpecTest : SpecTest() +class S3DataLakeSpecTest : SpecTest() diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTestUtil.kt similarity index 73% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2TestUtil.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTestUtil.kt index 588a10d231c3..d03fc4be4420 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTestUtil.kt @@ -9,11 +9,11 @@ import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils import io.airbyte.cdk.load.util.Jsons import io.airbyte.integrations.destination.s3_data_lake.io.AWSSystemCredentials -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import java.nio.file.Files import java.nio.file.Path -object IcebergV2TestUtil { +object S3DataLakeTestUtil { val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") val GLUE_ASSUME_ROLE_CONFIG_PATH: Path = Path.of("secrets/glue_assume_role.json") private val GLUE_AWS_ASSUME_ROLE_CONFIG_PATH: Path = @@ -21,7 +21,7 @@ object IcebergV2TestUtil { fun parseConfig(path: Path) = getConfig( - ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) + ValidatedJsonUtils.parseOne(S3DataLakeSpecification::class.java, Files.readString(path)) ) fun getAWSSystemCredentials(): AWSSystemCredentials { @@ -35,10 +35,11 @@ object IcebergV2TestUtil { } fun getConfig(spec: ConfigurationSpecification) = - IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) + S3DataLakeConfigurationFactory() + .makeWithoutExceptionHandling(spec as S3DataLakeSpecification) - fun getCatalog(config: IcebergV2Configuration, awsSystemCredentials: AWSSystemCredentials) = - IcebergUtil(SimpleTableIdGenerator(), awsSystemCredentials).let { icebergUtil -> + fun getCatalog(config: S3DataLakeConfiguration, awsSystemCredentials: AWSSystemCredentials) = + S3DataLakeUtil(SimpleTableIdGenerator(), awsSystemCredentials).let { icebergUtil -> val props = icebergUtil.toCatalogProperties(config) icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt similarity index 85% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2WriteTest.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt index fe17eca2ff5b..80e8456f7b7c 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt @@ -19,17 +19,17 @@ import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test -abstract class IcebergV2WriteTest( +abstract class S3DataLakeWriteTest( configContents: String, destinationCleaner: DestinationCleaner, envVars: Map = emptyMap(), ) : BasicFunctionalityIntegrationTest( configContents, - IcebergV2Specification::class.java, - IcebergV2DataDumper, + S3DataLakeSpecification::class.java, + S3DataLakeDataDumper, destinationCleaner, - IcebergExpectedRecordMapper, + S3DataLakeExpectedRecordMapper, isStreamSchemaRetroactive = true, supportsDedup = true, stringifySchemalessObjects = true, @@ -83,34 +83,34 @@ abstract class IcebergV2WriteTest( } } -class IcebergGlueWriteTest : - IcebergV2WriteTest( - Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), +class GlueWriteTest : + S3DataLakeWriteTest( + Files.readString(S3DataLakeTestUtil.GLUE_CONFIG_PATH), S3DataLakeDestinationCleaner( - IcebergV2TestUtil.getCatalog( - IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH), - IcebergV2TestUtil.getAWSSystemCredentials() + S3DataLakeTestUtil.getCatalog( + S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_CONFIG_PATH), + S3DataLakeTestUtil.getAWSSystemCredentials() ) ) ) -class IcebergGlueAssumeRoleWriteTest : - IcebergV2WriteTest( - Files.readString(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), +class GlueAssumeRoleWriteTest : + S3DataLakeWriteTest( + Files.readString(S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), S3DataLakeDestinationCleaner( - IcebergV2TestUtil.getCatalog( - IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), - IcebergV2TestUtil.getAWSSystemCredentials() + S3DataLakeTestUtil.getCatalog( + S3DataLakeTestUtil.parseConfig(S3DataLakeTestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), + S3DataLakeTestUtil.getAWSSystemCredentials() ) ), - IcebergV2TestUtil.getAWSSystemCredentialsAsMap() + S3DataLakeTestUtil.getAWSSystemCredentialsAsMap() ) @Disabled( "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" ) -class IcebergNessieMinioWriteTest : - IcebergV2WriteTest( +class NessieMinioWriteTest : + S3DataLakeWriteTest( getConfig(), // we're writing to ephemeral testcontainers, so no need to clean up after ourselves NoopDestinationCleaner diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2WriterTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriterTest.kt similarity index 89% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2WriterTest.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriterTest.kt index ba88a4526ed9..bdef5c680134 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/IcebergV2WriterTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriterTest.kt @@ -23,8 +23,8 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergTableWriterFactory -import io.airbyte.integrations.destination.s3_data_lake.io.IcebergUtil +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory +import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import io.mockk.every import io.mockk.mockk import org.apache.iceberg.Schema @@ -36,7 +36,7 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -internal class IcebergV2WriterTest { +internal class S3DataLakeWriterTest { @Test fun testCreateStreamLoader() { @@ -95,7 +95,7 @@ internal class IcebergV2WriterTest { ), Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) - val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() + val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -112,14 +112,14 @@ internal class IcebergV2WriterTest { every { catalogConfiguration } returns NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token") } - val icebergConfiguration: IcebergV2Configuration = mockk { + val icebergConfiguration: S3DataLakeConfiguration = mockk { every { awsAccessKeyConfiguration } returns awsConfiguration every { icebergCatalogConfiguration } returns icebergCatalogConfig every { s3BucketConfiguration } returns bucketConfiguration } val catalog: Catalog = mockk() val table: Table = mockk { every { schema() } returns icebergSchema } - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { createCatalog(any(), any()) } returns catalog every { createTable(any(), any(), any(), any()) } returns table every { toCatalogProperties(any()) } returns mapOf() @@ -129,13 +129,13 @@ internal class IcebergV2WriterTest { pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(emptyList()) } } - val icebergV2Writer = - IcebergV2Writer( - icebergTableWriterFactory = icebergTableWriterFactory, + val s3DataLakeWriter = + S3DataLakeWriter( + s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory, icebergConfiguration = icebergConfiguration, - icebergUtil = icebergUtil, + s3DataLakeUtil = s3DataLakeUtil, ) - val streamLoader = icebergV2Writer.createStreamLoader(stream = stream) + val streamLoader = s3DataLakeWriter.createStreamLoader(stream = stream) assertNotNull(streamLoader) } @@ -161,7 +161,7 @@ internal class IcebergV2WriterTest { Schema( Types.NestedField.of(2, true, "name", Types.StringType.get()), ) - val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() + val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -177,14 +177,14 @@ internal class IcebergV2WriterTest { every { catalogConfiguration } returns NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token") } - val icebergConfiguration: IcebergV2Configuration = mockk { + val icebergConfiguration: S3DataLakeConfiguration = mockk { every { awsAccessKeyConfiguration } returns awsConfiguration every { icebergCatalogConfiguration } returns icebergCatalogConfig every { s3BucketConfiguration } returns bucketConfiguration } val catalog: Catalog = mockk() val table: Table = mockk { every { schema() } returns icebergSchema } - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { createCatalog(any(), any()) } returns catalog every { createTable(any(), any(), any(), any()) } returns table every { toCatalogProperties(any()) } returns mapOf() @@ -194,15 +194,15 @@ internal class IcebergV2WriterTest { pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(emptyList()) } } - val icebergV2Writer = - IcebergV2Writer( - icebergTableWriterFactory = icebergTableWriterFactory, + val s3DataLakeWriter = + S3DataLakeWriter( + s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory, icebergConfiguration = icebergConfiguration, - icebergUtil = icebergUtil, + s3DataLakeUtil = s3DataLakeUtil, ) val e = assertThrows { - icebergV2Writer.createStreamLoader(stream = stream) + s3DataLakeWriter.createStreamLoader(stream = stream) } assertTrue( e.message?.startsWith("Table schema fields are different than catalog schema") ?: false @@ -268,7 +268,7 @@ internal class IcebergV2WriterTest { Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) val icebergSchema = Schema(columns, emptySet()) - val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() + val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -284,14 +284,14 @@ internal class IcebergV2WriterTest { every { catalogConfiguration } returns NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token") } - val icebergConfiguration: IcebergV2Configuration = mockk { + val icebergConfiguration: S3DataLakeConfiguration = mockk { every { awsAccessKeyConfiguration } returns awsConfiguration every { icebergCatalogConfiguration } returns icebergCatalogConfig every { s3BucketConfiguration } returns bucketConfiguration } val catalog: Catalog = mockk() val table: Table = mockk { every { schema() } returns icebergSchema } - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { createCatalog(any(), any()) } returns catalog every { createTable(any(), any(), any(), any()) } returns table every { toCatalogProperties(any()) } returns mapOf() @@ -301,15 +301,15 @@ internal class IcebergV2WriterTest { pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(listOf(primaryKeys)) } } - val icebergV2Writer = - IcebergV2Writer( - icebergTableWriterFactory = icebergTableWriterFactory, + val s3DataLakeWriter = + S3DataLakeWriter( + s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory, icebergConfiguration = icebergConfiguration, - icebergUtil = icebergUtil, + s3DataLakeUtil = s3DataLakeUtil, ) val e = assertThrows { - icebergV2Writer.createStreamLoader(stream = stream) + s3DataLakeWriter.createStreamLoader(stream = stream) } assertTrue(e.message?.startsWith("Identifier fields are different") ?: false) } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableCleanerTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleanerTest.kt similarity index 85% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableCleanerTest.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleanerTest.kt index 6db980c0b3c0..d976ee0bd8cb 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableCleanerTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleanerTest.kt @@ -23,17 +23,17 @@ import org.apache.iceberg.io.FileIO import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -internal class IcebergTableCleanerTest { +internal class S3DataLakeTableCleanerTest { @Test fun testClearingTableWithPrefix() { val catalog: Catalog = mockk { every { dropTable(any(), true) } returns true } - val icebergUtil: IcebergUtil = mockk() + val s3DataLakeUtil: S3DataLakeUtil = mockk() val tableIdentifier: TableIdentifier = mockk() val fileIo: S3FileIO = mockk { every { deletePrefix(any()) } returns Unit } val tableLocation = "table/location" - val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) + val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) cleaner.clearTable( catalog = catalog, @@ -49,12 +49,12 @@ internal class IcebergTableCleanerTest { @Test fun testClearingTableWithoutPrefix() { val catalog: Catalog = mockk { every { dropTable(any(), true) } returns true } - val icebergUtil: IcebergUtil = mockk() + val s3DataLakeUtil: S3DataLakeUtil = mockk() val tableIdentifier: TableIdentifier = mockk() val fileIo: FileIO = mockk() val tableLocation = "table/location" - val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) + val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) cleaner.clearTable( catalog = catalog, @@ -69,10 +69,10 @@ internal class IcebergTableCleanerTest { @Test fun `deleteGenerationId handles empty scan results gracefully`() { - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) + val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) val generationIdSuffix = "ab-generation-id-0-e" val tasks = CloseableIterable.empty() @@ -87,10 +87,10 @@ internal class IcebergTableCleanerTest { @Test fun `deleteGenerationId deletes matching file via deleteFile`() { - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) + val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) val generationIdSuffix = "ab-generation-id-0-e" val filePathToDelete = "path/to/gen-5678/foo-bar-ab-generation-id-0-e.parquet" val fileScanTask = mockk() @@ -114,7 +114,7 @@ internal class IcebergTableCleanerTest { } verify { - icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) + s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) table.newDelete().toBranch(eq("staging")) delete.deleteFile(filePathToDelete) delete.commit() @@ -123,10 +123,10 @@ internal class IcebergTableCleanerTest { @Test fun `deleteGenerationId should not delete non matching file via deleteFile`() { - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val cleaner = IcebergTableCleaner(icebergUtil = icebergUtil) + val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) val generationIdSuffix = "ab-generation-id-10-e" val filePathToDelete = "path/to/gen-5678/foo-bar-ab-generation-id-10-e.parquet" val fileScanTask = mockk() @@ -150,7 +150,7 @@ internal class IcebergTableCleanerTest { } verify(exactly = 0) { - icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) + s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) table.newDelete().toBranch(any()) delete.deleteFile(any()) delete.commit() diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableWriterFactoryTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactoryTest.kt similarity index 94% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableWriterFactoryTest.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactoryTest.kt index 0917a0274e93..cc17aa846a2b 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergTableWriterFactoryTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactoryTest.kt @@ -24,7 +24,7 @@ import org.apache.iceberg.types.Types import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -internal class IcebergTableWriterFactoryTest { +internal class S3DataLakeTableWriterFactoryTest { private val generationIdSuffix: String = "ab-generation-id-0-e" @@ -67,11 +67,11 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) + val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) val writer = factory.create( table = table, @@ -125,11 +125,11 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) + val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) val writer = factory.create( table = table, @@ -182,11 +182,11 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) + val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) val writer = factory.create( table = table, @@ -235,11 +235,11 @@ internal class IcebergTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val icebergUtil: IcebergUtil = mockk { + val s3DataLakeUtil: S3DataLakeUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil) + val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) val writer = factory.create( table = table, diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt similarity index 93% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergUtilTest.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt index 3abe0e76ade7..ce12721a7419 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt @@ -27,7 +27,7 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID -import io.airbyte.integrations.destination.s3_data_lake.IcebergV2Configuration +import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeConfiguration import io.airbyte.integrations.destination.s3_data_lake.SimpleTableIdGenerator import io.mockk.every import io.mockk.mockk @@ -51,14 +51,14 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows -internal class IcebergUtilTest { +internal class S3DataLakeUtilTest { - private lateinit var icebergUtil: IcebergUtil + private lateinit var s3DataLakeUtil: S3DataLakeUtil private val tableIdGenerator = SimpleTableIdGenerator() @BeforeEach fun setup() { - icebergUtil = IcebergUtil(tableIdGenerator) + s3DataLakeUtil = S3DataLakeUtil(tableIdGenerator) } @Test @@ -70,7 +70,8 @@ internal class IcebergUtilTest { URI to "http://localhost:19120/api/v1", WAREHOUSE_LOCATION to "s3://test/" ) - val catalog = icebergUtil.createCatalog(catalogName = catalogName, properties = properties) + val catalog = + s3DataLakeUtil.createCatalog(catalogName = catalogName, properties = properties) assertNotNull(catalog) assertEquals(catalogName, catalog.name()) assertEquals(NessieCatalog::class.java, catalog.javaClass) @@ -98,7 +99,7 @@ internal class IcebergUtilTest { false } val table = - icebergUtil.createTable( + s3DataLakeUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -134,7 +135,7 @@ internal class IcebergUtilTest { false } val table = - icebergUtil.createTable( + s3DataLakeUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -161,7 +162,7 @@ internal class IcebergUtilTest { every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true } val table = - icebergUtil.createTable( + s3DataLakeUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -214,7 +215,7 @@ internal class IcebergUtilTest { ) val schema = Schema(columns) val icebergRecord = - icebergUtil.toRecord( + s3DataLakeUtil.toRecord( record = airbyteRecord, pipeline = pipeline, tableSchema = schema, @@ -266,7 +267,7 @@ internal class IcebergUtilTest { ) val schema = Schema(columns, setOf(1)) val icebergRecord = - icebergUtil.toRecord( + s3DataLakeUtil.toRecord( record = airbyteRecord, pipeline = pipeline, tableSchema = schema, @@ -313,7 +314,7 @@ internal class IcebergUtilTest { ) val schema = Schema(columns, setOf(1)) val icebergRecord = - icebergUtil.toRecord( + s3DataLakeUtil.toRecord( record = airbyteRecord, pipeline = pipeline, tableSchema = schema, @@ -351,12 +352,12 @@ internal class IcebergUtilTest { NessieCatalogConfiguration(nessieServerUri, nessieAccessToken) ) val configuration = - IcebergV2Configuration( + S3DataLakeConfiguration( awsAccessKeyConfiguration = awsAccessKeyConfiguration, icebergCatalogConfiguration = icebergCatalogConfiguration, s3BucketConfiguration = s3BucketConfiguration, ) - val catalogProperties = icebergUtil.toCatalogProperties(config = configuration) + val catalogProperties = s3DataLakeUtil.toCatalogProperties(config = configuration) assertEquals(ICEBERG_CATALOG_TYPE_NESSIE, catalogProperties[ICEBERG_CATALOG_TYPE]) assertEquals(nessieServerUri, catalogProperties[URI]) assertEquals(warehouseLocation, catalogProperties[WAREHOUSE_LOCATION]) @@ -374,7 +375,7 @@ internal class IcebergUtilTest { fun `assertGenerationIdSuffixIsOfValidFormat accepts valid format`() { val validGenerationId = "ab-generation-id-123-e" assertDoesNotThrow { - icebergUtil.assertGenerationIdSuffixIsOfValidFormat(validGenerationId) + s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(validGenerationId) } } @@ -382,8 +383,8 @@ internal class IcebergUtilTest { fun `assertGenerationIdSuffixIsOfValidFormat throws exception for invalid prefix`() { val invalidGenerationId = "invalid-generation-id-123" val exception = - assertThrows { - icebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) + assertThrows { + s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) } assertEquals( "Invalid format: $invalidGenerationId. Expected format is 'ab-generation-id--e'", @@ -395,8 +396,8 @@ internal class IcebergUtilTest { fun `assertGenerationIdSuffixIsOfValidFormat throws exception for missing number`() { val invalidGenerationId = "ab-generation-id-" val exception = - assertThrows { - icebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) + assertThrows { + s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) } assertEquals( "Invalid format: $invalidGenerationId. Expected format is 'ab-generation-id--e'", @@ -409,7 +410,7 @@ internal class IcebergUtilTest { val stream = mockk() every { stream.generationId } returns 42 val expectedSuffix = "ab-generation-id-42-e" - val result = icebergUtil.constructGenerationIdSuffix(stream) + val result = s3DataLakeUtil.constructGenerationIdSuffix(stream) assertEquals(expectedSuffix, result) } @@ -419,7 +420,7 @@ internal class IcebergUtilTest { every { stream.generationId } returns -1 val exception = assertThrows { - icebergUtil.constructGenerationIdSuffix(stream) + s3DataLakeUtil.constructGenerationIdSuffix(stream) } assertEquals( "GenerationId must be non-negative. Provided: ${stream.generationId}", @@ -447,7 +448,7 @@ internal class IcebergUtilTest { syncId = 1, ) val pipeline = ParquetMapperPipelineFactory().create(stream) - val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + val schema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline) assertEquals(primaryKeys.toSet(), schema.identifierFieldNames()) assertEquals(6, schema.columns().size) assertNotNull(schema.findField("id")) @@ -477,7 +478,7 @@ internal class IcebergUtilTest { syncId = 1, ) val pipeline = ParquetMapperPipelineFactory().create(stream) - val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + val schema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline) assertEquals(emptySet(), schema.identifierFieldNames()) assertEquals(6, schema.columns().size) assertNotNull(schema.findField("id")) diff --git a/docs/integrations/destinations/s3-data-lake.md b/docs/integrations/destinations/s3-data-lake.md index 29c390f1d9ad..15c0008db508 100644 --- a/docs/integrations/destinations/s3-data-lake.md +++ b/docs/integrations/destinations/s3-data-lake.md @@ -15,10 +15,11 @@ for more information.
Expand to review -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:------------------------------------------------------------|:--------------------------------------------| -| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/50991) | Rename/Cleanup package name from Iceberg V2 | -| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50991) | Add support for GLUE RBAC (Assume role) | -| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:------------------------------------------------------------|:---------------------------------------------| +| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 | +| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package name from Iceberg V2 | +| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) | +| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |