From 8e9728d63ff2cdf79f5bb5195b607ba80e684cbe Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Mon, 18 Mar 2024 09:59:46 +0100 Subject: [PATCH] Factor out Hadoop DynamoDB jobs configuration --- .../com/scylladb/migrator/DynamoUtils.scala | 53 ++++++++++++++++++- .../scylladb/migrator/readers/DynamoDB.scala | 25 ++++----- .../scylladb/migrator/writers/DynamoDB.scala | 26 ++++----- 3 files changed, 71 insertions(+), 33 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala index 839b10c3..1fd259a4 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala @@ -4,7 +4,6 @@ import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDBClientBuilder, - AmazonDynamoDBStreamsClient, AmazonDynamoDBStreamsClientBuilder } import com.amazonaws.services.dynamodbv2.model.{ @@ -17,7 +16,14 @@ import com.amazonaws.services.dynamodbv2.model.{ TableDescription, UpdateTableRequest } -import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings, TargetSettings } +import com.scylladb.migrator.config.{ + AWSCredentials, + DynamoDBEndpoint, + SourceSettings, + TargetSettings +} +import org.apache.hadoop.dynamodb.DynamoDBConstants +import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager import scala.util.{ Failure, Success, Try } @@ -132,4 +138,47 @@ object DynamoUtils { builder.build() } + + /** + * Optionally set a configuration. If `maybeValue` is empty, nothing is done. Otherwise, + * its value is set to the `name` property on the `jobConf`. + * + * @param jobConf Target JobConf to configure + * @param name Name of the Hadoop configuration key + * @param maybeValue Optional value to set. + */ + def setOptionalConf(jobConf: JobConf, name: String, maybeValue: Option[String]): Unit = + for (value <- maybeValue) { + jobConf.set(name, value) + } + + /** + * Set the common configuration of both read and write DynamoDB jobs. + * @param jobConf Target JobConf to configure + * @param maybeRegion AWS region + * @param maybeEndpoint AWS endpoint + * @param maybeScanSegments Scan segments + * @param maybeMaxMapTasks Max map tasks + * @param maybeAwsCredentials AWS credentials + */ + def setDynamoDBJobConf(jobConf: JobConf, + maybeRegion: Option[String], + maybeEndpoint: Option[DynamoDBEndpoint], + maybeScanSegments: Option[Int], + maybeMaxMapTasks: Option[Int], + maybeAwsCredentials: Option[AWSCredentials]): Unit = { + setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion) + setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint)) + setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString)) + setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString)) + for (credentials <- maybeAwsCredentials) { + jobConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, credentials.accessKey) + jobConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, credentials.secretKey) + } + jobConf.set( + "mapred.output.format.class", + "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") + jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") + } + } diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala index 12c3a122..c29d4240 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala @@ -1,6 +1,7 @@ package com.scylladb.migrator.readers import com.amazonaws.services.dynamodbv2.model.TableDescription +import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf } import com.scylladb.migrator.config.SourceSettings import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable } import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat @@ -25,25 +26,19 @@ object DynamoDB { else None val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) - def setOptionalConf(name: String, maybeValue: Option[String]): Unit = - for (value <- maybeValue) { - jobConf.set(name, value) - } + setDynamoDBJobConf( + jobConf, + source.region, + source.endpoint, + source.scanSegments, + source.maxMapTasks, + source.credentials) jobConf.set(DynamoDBConstants.INPUT_TABLE_NAME, source.table) - setOptionalConf(DynamoDBConstants.REGION, source.region) - setOptionalConf(DynamoDBConstants.ENDPOINT, source.endpoint.map(_.renderEndpoint)) - setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput) + setOptionalConf(jobConf, DynamoDBConstants.READ_THROUGHPUT, throughput) setOptionalConf( + jobConf, DynamoDBConstants.THROUGHPUT_READ_PERCENT, source.throughputReadPercent.map(_.toString)) - setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, source.scanSegments.map(_.toString)) - setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, source.maxMapTasks.map(_.toString)) - setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, source.credentials.map(_.accessKey)) - setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, source.credentials.map(_.secretKey)) - jobConf.set( - "mapred.output.format.class", - "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") - jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") spark.sparkContext.hadoopRDD( jobConf, diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala index dab4dbc7..c5542cf8 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala @@ -1,6 +1,7 @@ package com.scylladb.migrator.writers import com.amazonaws.services.dynamodbv2.model.TableDescription +import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf } import com.scylladb.migrator.config.{ Rename, TargetSettings } import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable } import org.apache.hadoop.io.Text @@ -26,26 +27,19 @@ object DynamoDB { val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) - def setOptionalConf(name: String, maybeValue: Option[String]): Unit = - for (value <- maybeValue) { - jobConf.set(name, value) - } - + setDynamoDBJobConf( + jobConf, + target.region, + target.endpoint, + target.scanSegments, + target.maxMapTasks, + target.credentials) jobConf.set(DynamoDBConstants.OUTPUT_TABLE_NAME, target.table) - setOptionalConf(DynamoDBConstants.REGION, target.region) - setOptionalConf(DynamoDBConstants.ENDPOINT, target.endpoint.map(_.renderEndpoint)) - setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput) + setOptionalConf(jobConf, DynamoDBConstants.WRITE_THROUGHPUT, throughput) setOptionalConf( + jobConf, DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, target.throughputWritePercent.map(_.toString)) - setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, target.scanSegments.map(_.toString)) - setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, target.maxMapTasks.map(_.toString)) - setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, target.credentials.map(_.accessKey)) - setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, target.credentials.map(_.secretKey)) - jobConf.set( - "mapred.output.format.class", - "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") - jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") rdd .mapValues { itemWritable =>