Skip to content

Commit

Permalink
Merge pull request #105 from julienrf/discard-schema-inference
Browse files Browse the repository at this point in the history
Do not try to infer a schema when migrating from DynamoDB to Alternator
  • Loading branch information
tarzanek authored Mar 18, 2024
2 parents a7d43e0 + 8e9728d commit b4c60d5
Show file tree
Hide file tree
Showing 20 changed files with 529 additions and 337 deletions.
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
path = spark-cassandra-connector
url = https://github.com/scylladb/spark-cassandra-connector
branch = feature/track-token-ranges
[submodule "spark-dynamodb"]
path = spark-dynamodb
url = https://github.com/scylladb/spark-dynamodb
[submodule "spark-kinesis"]
path = spark-kinesis
url = https://github.com/scylladb/spark-kinesis
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ lazy val migrator = (project in file("migrator")).settings(
"com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion,
("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2")
.excludeAll(InclExclRule("com.fasterxml.jackson.core")),
"com.amazon.emr" % "emr-dynamodb-hadoop" % "4.16.0",
"org.yaml" % "snakeyaml" % "1.23",
"io.circe" %% "circe-yaml" % "0.9.0",
"io.circe" %% "circe-generic" % "0.9.0",
Expand Down
4 changes: 0 additions & 4 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ trap "rm -rf $TMPDIR" EXIT
pushd spark-cassandra-connector
sbt -Djava.io.tmpdir="$TMPDIR" ++2.11.12 assembly
popd
pushd spark-dynamodb
sbt assembly
popd
pushd spark-kinesis
sbt assembly
popd
Expand All @@ -26,7 +23,6 @@ if [ ! -d "./migrator/lib" ]; then
fi

cp ./spark-cassandra-connector/connector/target/scala-2.11/spark-cassandra-connector-assembly-*.jar ./migrator/lib
cp ./spark-dynamodb/target/scala-2.11/spark-dynamodb-assembly-*.jar ./migrator/lib
cp ./spark-kinesis/target/scala-2.11/spark-streaming-kinesis-asl-assembly-*.jar ./migrator/lib

sbt -Djava.io.tmpdir="$TMPDIR" migrator/assembly
2 changes: 2 additions & 0 deletions docker-compose-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
volumes:
- ./migrator/target/scala-2.11:/jars
- ./tests/src/test/configurations:/app/configurations
# Workaround for https://github.com/awslabs/emr-dynamodb-connector/issues/50
- ${PWD}/tests/docker/job-flow.json:/mnt/var/lib/info/job-flow.json

spark-worker:
image: bde2020/spark-worker:2.4.4-hadoop2.7
Expand Down
67 changes: 63 additions & 4 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.dynamodbv2.{
AmazonDynamoDBClientBuilder,
AmazonDynamoDBStreamsClient,
AmazonDynamoDBStreamsClientBuilder
}
import com.amazonaws.services.dynamodbv2.model.{
Expand All @@ -17,7 +16,14 @@ import com.amazonaws.services.dynamodbv2.model.{
TableDescription,
UpdateTableRequest
}
import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings, TargetSettings }
import com.scylladb.migrator.config.{
AWSCredentials,
DynamoDBEndpoint,
SourceSettings,
TargetSettings
}
import org.apache.hadoop.dynamodb.DynamoDBConstants
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager

import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -64,7 +70,8 @@ object DynamoUtils {

def enableDynamoStream(source: SourceSettings.DynamoDB): Unit = {
val sourceClient = buildDynamoClient(source.endpoint, source.credentials, source.region)
val sourceStreamsClient = buildDynamoStreamsClient(source.credentials, source.region)
val sourceStreamsClient =
buildDynamoStreamsClient(source.endpoint, source.credentials, source.region)

sourceClient
.updateTable(
Expand Down Expand Up @@ -114,12 +121,64 @@ object DynamoUtils {
builder.build()
}

def buildDynamoStreamsClient(creds: Option[AWSCredentialsProvider], region: Option[String]) = {
def buildDynamoStreamsClient(endpoint: Option[DynamoDBEndpoint],
creds: Option[AWSCredentialsProvider],
region: Option[String]) = {
val builder = AmazonDynamoDBStreamsClientBuilder.standard()

endpoint.foreach { endpoint =>
builder
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
endpoint.renderEndpoint,
region.getOrElse("empty")))
}
creds.foreach(builder.withCredentials)
region.foreach(builder.withRegion)

builder.build()
}

/**
* Optionally set a configuration. If `maybeValue` is empty, nothing is done. Otherwise,
* its value is set to the `name` property on the `jobConf`.
*
* @param jobConf Target JobConf to configure
* @param name Name of the Hadoop configuration key
* @param maybeValue Optional value to set.
*/
def setOptionalConf(jobConf: JobConf, name: String, maybeValue: Option[String]): Unit =
for (value <- maybeValue) {
jobConf.set(name, value)
}

/**
* Set the common configuration of both read and write DynamoDB jobs.
* @param jobConf Target JobConf to configure
* @param maybeRegion AWS region
* @param maybeEndpoint AWS endpoint
* @param maybeScanSegments Scan segments
* @param maybeMaxMapTasks Max map tasks
* @param maybeAwsCredentials AWS credentials
*/
def setDynamoDBJobConf(jobConf: JobConf,
maybeRegion: Option[String],
maybeEndpoint: Option[DynamoDBEndpoint],
maybeScanSegments: Option[Int],
maybeMaxMapTasks: Option[Int],
maybeAwsCredentials: Option[AWSCredentials]): Unit = {
setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion)
setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint))
setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString))
setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString))
for (credentials <- maybeAwsCredentials) {
jobConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, credentials.accessKey)
jobConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, credentials.secretKey)
}
jobConf.set(
"mapred.output.format.class",
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
}

}
216 changes: 14 additions & 202 deletions migrator/src/main/scala/com/scylladb/migrator/Migrator.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
package com.scylladb.migrator

import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }
import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit }
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
import com.datastax.spark.connector.rdd.partitioner.{ CassandraPartition, CqlTokenRange }
import com.datastax.spark.connector.rdd.partitioner.dht.Token
import com.datastax.spark.connector.writer._
import com.scylladb.migrator.alternator.AlternatorMigrator
import com.scylladb.migrator.config._
import com.scylladb.migrator.writers.DynamoStreamReplication
import com.scylladb.migrator.scylla.ScyllaMigrator
import org.apache.log4j.{ Level, LogManager, Logger }
import org.apache.spark.sql._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kinesis.{ KinesisInputDStream, SparkAWSCredentials }
import sun.misc.{ Signal, SignalHandler }

import scala.util.control.NonFatal

object Migrator {
val log = LogManager.getLogger("com.scylladb.migrator")
Expand All @@ -27,7 +16,6 @@ object Migrator {
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.getOrCreate
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(5))

Logger.getRootLogger.setLevel(Level.WARN)
log.setLevel(Level.INFO)
Expand All @@ -39,202 +27,26 @@ object Migrator {

log.info(s"Loaded config: ${migratorConfig}")

val scheduler = new ScheduledThreadPoolExecutor(1)

val sourceDF =
migratorConfig.source match {
case cassandraSource: SourceSettings.Cassandra =>
readers.Cassandra.readDataframe(
try {
(migratorConfig.source, migratorConfig.target) match {
case (cassandraSource: SourceSettings.Cassandra, scyllaTarget: TargetSettings.Scylla) =>
val sourceDF = readers.Cassandra.readDataframe(
spark,
cassandraSource,
cassandraSource.preserveTimestamps,
migratorConfig.skipTokenRanges)
case parquetSource: SourceSettings.Parquet =>
readers.Parquet.readDataFrame(spark, parquetSource)
case dynamoSource: SourceSettings.DynamoDB =>
val tableDesc = DynamoUtils
.buildDynamoClient(dynamoSource.endpoint, dynamoSource.credentials, dynamoSource.region)
.describeTable(dynamoSource.table)
.getTable

readers.DynamoDB.readDataFrame(spark, dynamoSource, tableDesc)
}

log.info("Created source dataframe; resulting schema:")
sourceDF.dataFrame.printSchema()

val tokenRangeAccumulator =
if (!sourceDF.savepointsSupported) None
else {
val tokenRangeAccumulator = TokenRangeAccumulator.empty
spark.sparkContext.register(tokenRangeAccumulator, "Token ranges copied")

addUSR2Handler(migratorConfig, tokenRangeAccumulator)
startSavepointSchedule(scheduler, migratorConfig, tokenRangeAccumulator)

Some(tokenRangeAccumulator)
}

log.info(
"We need to transfer: " + sourceDF.dataFrame.rdd.getNumPartitions + " partitions in total")

if (migratorConfig.source.isInstanceOf[SourceSettings.Cassandra]) {
val partitions = sourceDF.dataFrame.rdd.partitions
val cassandraPartitions = partitions.map(p => {
p.asInstanceOf[CassandraPartition[_, _]]
})
var allTokenRanges = Set[(Token[_], Token[_])]()
cassandraPartitions.foreach(p => {
p.tokenRanges
.asInstanceOf[Vector[CqlTokenRange[_, _]]]
.foreach(tr => {
val range =
Set((tr.range.start.asInstanceOf[Token[_]], tr.range.end.asInstanceOf[Token[_]]))
allTokenRanges = allTokenRanges ++ range
})

})

log.info("All token ranges extracted from partitions size:" + allTokenRanges.size)

if (migratorConfig.skipTokenRanges != None) {
log.info(
"Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size)

val diff = allTokenRanges.diff(migratorConfig.skipTokenRanges)
log.info("Diff ... total diff of full ranges to savepoints is: " + diff.size)
log.debug("Dump of the missing tokens: ")
log.debug(diff)
}
}

log.info("Starting write...")

try {
migratorConfig.target match {
case target: TargetSettings.Scylla =>
writers.Scylla.writeDataframe(
target,
migratorConfig.renames,
sourceDF.dataFrame,
sourceDF.timestampColumns,
tokenRangeAccumulator)
case target: TargetSettings.DynamoDB =>
val sourceAndDescriptions = migratorConfig.source match {
case source: SourceSettings.DynamoDB =>
if (target.streamChanges) {
log.info(
"Source is a Dynamo table and change streaming requested; enabling Dynamo Stream")
DynamoUtils.enableDynamoStream(source)
}
val sourceDesc =
DynamoUtils
.buildDynamoClient(source.endpoint, source.credentials, source.region)
.describeTable(source.table)
.getTable

Some(
(
source,
sourceDesc,
DynamoUtils.replicateTableDefinition(
sourceDesc,
target
)
))

case _ =>
None
}

writers.DynamoDB.writeDataframe(
target,
migratorConfig.renames,
sourceDF.dataFrame,
sourceAndDescriptions.map(_._3))

sourceAndDescriptions.foreach {
case (source, sourceDesc, targetDesc) =>
if (target.streamChanges) {
log.info("Done transferring table snapshot. Starting to transfer changes")

DynamoStreamReplication.createDStream(
spark,
streamingContext,
source,
target,
sourceDF.dataFrame.schema,
sourceDesc,
targetDesc,
migratorConfig.renames)

streamingContext.start()
streamingContext.awaitTermination()
}
}
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (parquetSource: SourceSettings.Parquet, scyllaTarget: TargetSettings.Scylla) =>
val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (dynamoSource: SourceSettings.DynamoDB, alternatorTarget: TargetSettings.DynamoDB) =>
AlternatorMigrator.migrate(dynamoSource, alternatorTarget, migratorConfig.renames)
case _ =>
sys.error("Unsupported combination of source and target.")
}
} catch {
case NonFatal(e) => // Catching everything on purpose to try and dump the accumulator state
log.error(
"Caught error while writing the DataFrame. Will create a savepoint before exiting",
e)
} finally {
tokenRangeAccumulator.foreach(dumpAccumulatorState(migratorConfig, _, "final"))
scheduler.shutdown()
spark.stop()
}
}

def savepointFilename(path: String): String =
s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml"

def dumpAccumulatorState(config: MigratorConfig,
accumulator: TokenRangeAccumulator,
reason: String): Unit = {
val filename =
Paths.get(savepointFilename(config.savepoints.path)).normalize
val rangesToSkip = accumulator.value.get.map(range =>
(range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]]))

val modifiedConfig = config.copy(
skipTokenRanges = config.skipTokenRanges ++ rangesToSkip
)

Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8))

log.info(
s"Created a savepoint config at ${filename} due to ${reason}. Ranges added: ${rangesToSkip}")
}

def startSavepointSchedule(svc: ScheduledThreadPoolExecutor,
config: MigratorConfig,
acc: TokenRangeAccumulator): Unit = {
val runnable = new Runnable {
override def run(): Unit =
try dumpAccumulatorState(config, acc, "schedule")
catch {
case e: Throwable =>
log.error("Could not create the savepoint. This will be retried.", e)
}
}

log.info(
s"Starting savepoint schedule; will write a savepoint every ${config.savepoints.intervalSeconds} seconds")

svc.scheduleAtFixedRate(runnable, 0, config.savepoints.intervalSeconds, TimeUnit.SECONDS)
}

def addUSR2Handler(config: MigratorConfig, acc: TokenRangeAccumulator) = {
log.info(
"Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.")

val handler = new SignalHandler {
override def handle(signal: Signal): Unit =
dumpAccumulatorState(config, acc, signal.toString)
}

Signal.handle(new Signal("USR2"), handler)
Signal.handle(new Signal("TERM"), handler)
Signal.handle(new Signal("INT"), handler)
}
}
Loading

0 comments on commit b4c60d5

Please sign in to comment.