Skip to content

Commit

Permalink
Factor out Hadoop DynamoDB jobs configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrf committed Mar 18, 2024
1 parent afcbfd8 commit 8e9728d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 33 deletions.
53 changes: 51 additions & 2 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.dynamodbv2.{
AmazonDynamoDBClientBuilder,
AmazonDynamoDBStreamsClient,
AmazonDynamoDBStreamsClientBuilder
}
import com.amazonaws.services.dynamodbv2.model.{
Expand All @@ -17,7 +16,14 @@ import com.amazonaws.services.dynamodbv2.model.{
TableDescription,
UpdateTableRequest
}
import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings, TargetSettings }
import com.scylladb.migrator.config.{
AWSCredentials,
DynamoDBEndpoint,
SourceSettings,
TargetSettings
}
import org.apache.hadoop.dynamodb.DynamoDBConstants
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager

import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -132,4 +138,47 @@ object DynamoUtils {

builder.build()
}

/**
* Optionally set a configuration. If `maybeValue` is empty, nothing is done. Otherwise,
* its value is set to the `name` property on the `jobConf`.
*
* @param jobConf Target JobConf to configure
* @param name Name of the Hadoop configuration key
* @param maybeValue Optional value to set.
*/
def setOptionalConf(jobConf: JobConf, name: String, maybeValue: Option[String]): Unit =
for (value <- maybeValue) {
jobConf.set(name, value)
}

/**
* Set the common configuration of both read and write DynamoDB jobs.
* @param jobConf Target JobConf to configure
* @param maybeRegion AWS region
* @param maybeEndpoint AWS endpoint
* @param maybeScanSegments Scan segments
* @param maybeMaxMapTasks Max map tasks
* @param maybeAwsCredentials AWS credentials
*/
def setDynamoDBJobConf(jobConf: JobConf,
maybeRegion: Option[String],
maybeEndpoint: Option[DynamoDBEndpoint],
maybeScanSegments: Option[Int],
maybeMaxMapTasks: Option[Int],
maybeAwsCredentials: Option[AWSCredentials]): Unit = {
setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion)
setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint))
setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString))
setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString))
for (credentials <- maybeAwsCredentials) {
jobConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, credentials.accessKey)
jobConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, credentials.secretKey)
}
jobConf.set(
"mapred.output.format.class",
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scylladb.migrator.readers

import com.amazonaws.services.dynamodbv2.model.TableDescription
import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf }
import com.scylladb.migrator.config.SourceSettings
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
Expand All @@ -25,25 +26,19 @@ object DynamoDB {
else None

val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)
def setOptionalConf(name: String, maybeValue: Option[String]): Unit =
for (value <- maybeValue) {
jobConf.set(name, value)
}
setDynamoDBJobConf(
jobConf,
source.region,
source.endpoint,
source.scanSegments,
source.maxMapTasks,
source.credentials)
jobConf.set(DynamoDBConstants.INPUT_TABLE_NAME, source.table)
setOptionalConf(DynamoDBConstants.REGION, source.region)
setOptionalConf(DynamoDBConstants.ENDPOINT, source.endpoint.map(_.renderEndpoint))
setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput)
setOptionalConf(jobConf, DynamoDBConstants.READ_THROUGHPUT, throughput)
setOptionalConf(
jobConf,
DynamoDBConstants.THROUGHPUT_READ_PERCENT,
source.throughputReadPercent.map(_.toString))
setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, source.scanSegments.map(_.toString))
setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, source.maxMapTasks.map(_.toString))
setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, source.credentials.map(_.accessKey))
setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, source.credentials.map(_.secretKey))
jobConf.set(
"mapred.output.format.class",
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

spark.sparkContext.hadoopRDD(
jobConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scylladb.migrator.writers

import com.amazonaws.services.dynamodbv2.model.TableDescription
import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf }
import com.scylladb.migrator.config.{ Rename, TargetSettings }
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
Expand All @@ -26,26 +27,19 @@ object DynamoDB {

val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)

def setOptionalConf(name: String, maybeValue: Option[String]): Unit =
for (value <- maybeValue) {
jobConf.set(name, value)
}

setDynamoDBJobConf(
jobConf,
target.region,
target.endpoint,
target.scanSegments,
target.maxMapTasks,
target.credentials)
jobConf.set(DynamoDBConstants.OUTPUT_TABLE_NAME, target.table)
setOptionalConf(DynamoDBConstants.REGION, target.region)
setOptionalConf(DynamoDBConstants.ENDPOINT, target.endpoint.map(_.renderEndpoint))
setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput)
setOptionalConf(jobConf, DynamoDBConstants.WRITE_THROUGHPUT, throughput)
setOptionalConf(
jobConf,
DynamoDBConstants.THROUGHPUT_WRITE_PERCENT,
target.throughputWritePercent.map(_.toString))
setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, target.scanSegments.map(_.toString))
setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, target.maxMapTasks.map(_.toString))
setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, target.credentials.map(_.accessKey))
setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, target.credentials.map(_.secretKey))
jobConf.set(
"mapred.output.format.class",
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

rdd
.mapValues { itemWritable =>
Expand Down

0 comments on commit 8e9728d

Please sign in to comment.