Skip to content

Sample for migrating and validating data between Cassandra wire protocol systems using Apache Spark, while preserving writetime values.

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE
MIT
LICENSE.md
Notifications You must be signed in to change notification settings

ntegral/cassandra-migrator

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

21 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Migrate Cassandra data with Azure Databricks

This sample allows you to migrate data between tables in Apache Cassandra using Spark with Azure Databricks, while preserving the original writetime. This can be useful when doing historic data loads during a live migration.

Setup Azure Databricks

Prerequisites

  • Provision an Azure Databricks cluster. Ensure it also has network access to your source and target Cassandra clusters.

  • Ensure you've already migrated the keyspace/table schema from your source Cassandra database to your target Cassandra database.

Provision a Spark cluster

Select an Azure Databricks runtime version which supports Spark 3.0 or higher.

Databricks runtime

Add Cassandra Migrator Spark dependencies

  • Download the dependency jar here *
  • Upload and install the jar on your Databricks cluster:

Dependency jar

Select Install, and then restart the cluster when installation is complete.

* You can also build the dependency jar using SBT by running ./build.sh in the /build_files directory of this repo.

Note

Make sure that you restart the Databricks cluster after the dependency jar has been installed.

Configure Spark Connector throughput

In order to maximize throughput for large migrations, you may need to change Spark parameters at the cluster level. You can apply these settings in advanced options within cluster config, e.g. below. You may also want to increase the number of workers in your Spark cluster.

spark.cassandra.output.batch.size.rows 1
spark.cassandra.output.concurrent.writes 500
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000

Config

Migrate Cassandra tables

Create a new Scala notebook in Databricks with two seperate cells:

Read Cassandra source table

In this case, we are migrating from a source cluster which does not implement SSL, to a target table which does. You can adjust sslOptions for your source/target tables accordingly.

import org.apache.spark.sql._

val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60") 
      .getOrCreate

import com.cassandra.migrator.readers.Cassandra
import com.cassandra.migrator.config._
import com.datastax.spark.connector.cql.CassandraConnector;

val cassandraSource = new SourceSettings.Cassandra(
  host = "<source Cassandra host name/IP here>",
  port = 9042,
  localDC = None,
  credentials = Some(Credentials(
    username="<username here>", 
    password="<password here>")
  ),
  sslOptions = Some(SSLOptions(
    clientAuthEnabled=false,
    enabled=false,
    trustStorePassword = None,
    trustStorePath = None,
    trustStoreType = None,
    keyStorePassword = None,
    keyStorePath = None,
    keyStoreType = None,
    enabledAlgorithms = None,
    protocol = Some("TLS")
  )),  
  keyspace = "<source keyspace name>",
  table = "<source table name>",
  splitCount = Some(1), // Number of splits to use - this should be at minimum the amount of cores available in the Spark cluster, and optimally more; higher splits will lead to more fine-grained resumes. Aim for 8 * (Spark cores).
  connections = Some(1), // Number of connections to use to Cassandra when copying
  fetchSize = 1000, // Number of rows to fetch in each read
  preserveTimestamps = true, // Preserve TTLs and WRITETIMEs of cells in the source database. Note that this option is *incompatible* when copying tables with collections (lists, maps, sets).
  where = None // Optional condition to filter source table data that will be migrated, e.g. where: race_start_date = '2015-05-27' AND race_end_date = '2015-05-27'
)

val sourceDF = Cassandra.readDataframe(
  spark,
  cassandraSource,
  cassandraSource.preserveTimestamps,
  tokenRangesToSkip = Set()
)
sourceDF.dataFrame.printSchema()

Migrate to Cassandra target table

import com.cassandra.migrator.writers

implicit val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60")
      .getOrCreate

val target = new TargetSettings.Cassandra(
  host = "<target Cassandra host name/IP>",
  port = 9042,
  localDC = None,
  credentials = Some(com.cassandra.migrator.config.Credentials(
    username="<username here>", 
    password="<password here>")
  ),
  sslOptions = Some(SSLOptions(
    clientAuthEnabled=false,
    enabled=true,
    trustStorePassword = None,
    trustStorePath = None,
    trustStoreType = None,
    keyStorePassword = None,
    keyStorePath = None,
    keyStoreType = None,
    enabledAlgorithms = Some(Set("TLS_RSA_WITH_AES_128_CBC_SHA","TLS_RSA_WITH_AES_256_CBC_SHA")),
    protocol = Some("TLS")
  )),   
  keyspace = "<target keyspace name>",
  table = "<target table name>",
   connections = Some(1),
  stripTrailingZerosForDecimals = false
)

writers.Cassandra.writeDataframe(
            target,
            List(),
            sourceDF.dataFrame,
            sourceDF.timestampColumns
)

Validate Migration

To validate the migration using row comparison, create a third cell with the following and adjust the parameters to preferred tolerance:

import com.cassandra.migrator.Validator
import com.cassandra.migrator.config._

val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60") 
      .getOrCreate

val validatorConfig = new Validation(
  compareTimestamps = true,
  ttlToleranceMillis = 1,
  writetimeToleranceMillis = 1,
  failuresToFetch = 10,
  floatingPointTolerance = 1.0
)

val migratorConfig = new MigratorConfig(
  cassandraSource,
  target,
  List(),
  savepoints = null,
  skipTokenRanges = Set(),
  validatorConfig
)

Validator.runValidation(migratorConfig)(spark)

If rows do not match, this will return something like the following output:

Validation output

Retry transient failures

The row comparison in Validation may return an error for missing rows in the target table, for example see below:

Missing records

This may indicate that a transient failure occured during the overall migration process. If this happens, you can use the Validator to extract the missing records, and re-run the migration inserting only those records, as long as you can specify the primary key for filtering.

Add the below sample cell after your existing cells in the same notebook. This will construct the values required in the "where" parameter of SourceSettings resulting from the row comparison, and will then write only those filtered records to the target table. Be sure to change the value of primaryKey to be the name of your primary key field, as well as replacing the credentials and source keyspace/table:

implicit val spark = SparkSession
      .builder()
      .appName("cassandra-migrator")
      .config("spark.task.maxFailures", "1024")
      .config("spark.stage.maxConsecutiveAttempts", "60")
      .getOrCreate

//construct Cassandra IN clause to filter only missing rows - ***CHANGE primaryKey
val primaryKey = "<primary key of source/target table>"
val failures = Validator.runValidation(migratorConfig)(spark)
val whereValues = failures
  .map(failure => s"'${failure.row.getString(primaryKey)}'")
  .mkString(s"$primaryKey IN (", ",", ")")

//re-set cassandraSource
var cassandraSource = new SourceSettings.Cassandra(
  host = "<source Cassandra host name/IP here>",
  port = 9042,
  localDC = None,
  credentials = Some(Credentials(
    username="<username here>", 
    password="<password here>")
  ),
  sslOptions = Some(SSLOptions(
    clientAuthEnabled=false,
    enabled=true,
    trustStorePassword = None,
    trustStorePath = None,
    trustStoreType = None,
    keyStorePassword = None,
    keyStorePath = None,
    keyStoreType = None,
    enabledAlgorithms = None,
    protocol = Some("TLS")
  )),  
  keyspace = "<source keyspace name>",
  table = "<source table name>",
  splitCount = Some(1),
  connections = Some(1), 
  fetchSize = 1000, 
  preserveTimestamps = true,
  //specifying where values extracted from validation above to filter only missing records for migration
  where = Some(whereValues)
)

val sourceDF = Cassandra.readDataframe(
  spark,
  cassandraSource,
  cassandraSource.preserveTimestamps,
  tokenRangesToSkip = Set()
)
// re-use exiting target config to re-migrate only failed records
writers.Cassandra.writeDataframe(
            target,
            List(),
            sourceDF.dataFrame,
            sourceDF.timestampColumns
)

SSLOptions Parameters

Parameter Description Default value
enabled Enable secure connection to Cassandra cluster false
trustStorePath Path for the trust store being used None
trustStorePassword Trust store password None
trustStoreType Trust store type JKS
protocol SSL protocol TLS
enabledAlgorithms SSL cipher suites Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")
clientAuthEnabled Enable 2-way secure connection to Cassandra cluster false
keyStorePath Path for the key store being used None
keyStorePassword Key store password None
keyStoreType Key store type JKS

About

Sample for migrating and validating data between Cassandra wire protocol systems using Apache Spark, while preserving writetime values.

Resources

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE
MIT
LICENSE.md

Code of conduct

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Scala 99.4%
  • Shell 0.6%