Skip to content

Commit

Permalink
WIP Add basic migration test from Cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrf committed Mar 31, 2024
1 parent 1b3a104 commit 22ca7a4
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 7 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ jobs:
- name: Set up services
run: |
docker compose -f docker-compose-tests.yml up -d
.github/wait-for-port.sh 8000 # ScyllaDB
.github/wait-for-port.sh 9042 # ScyllaDB
.github/wait-for-port.sh 8000 # ScyllaDB Alternator
.github/wait-for-port.sh 8001 # DynamoDB
.github/wait-for-port.sh 9043 # Cassandra
.github/wait-for-port.sh 8080 # Spark master
.github/wait-for-port.sh 8081 # Spark worker
- name: Run tests
Expand Down
14 changes: 9 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ import sbt.librarymanagement.InclExclRule
val awsSdkVersion = "1.11.728"
val sparkVersion = "2.4.4"

inThisBuild(
List(
organization := "com.scylladb",
scalaVersion := "2.11.12",
scalacOptions += "-target:jvm-1.8"
)
)

lazy val migrator = (project in file("migrator")).settings(
inThisBuild(
List(
organization := "com.scylladb",
scalaVersion := "2.11.12"
)),
name := "scylla-migrator",
version := "0.0.1",
mainClass := Some("com.scylladb.migrator.Migrator"),
Expand Down Expand Up @@ -72,6 +75,7 @@ lazy val migrator = (project in file("migrator")).settings(
lazy val tests = project.in(file("tests")).settings(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion,
"org.apache.cassandra" % "java-driver-query-builder" % "4.18.0",
"org.scalameta" %% "munit" % "0.7.29",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
),
Expand Down
13 changes: 13 additions & 0 deletions docker-compose-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ services:
- "8001:8000"
working_dir: /home/dynamodblocal

cassandra:
image: cassandra:latest
networks:
- scylla
volumes:
- ./tests/docker/cassandra:/var/lib/cassandra
ports:
- "9043:9042"
expose:
- 9043

scylla:
image: scylladb/scylla:latest
networks:
Expand All @@ -22,6 +33,7 @@ services:
- "./tests/docker/scylla:/var/lib/scylla"
ports:
- "8000:8000"
- "9042:9042"
command: "--smp 2 --memory 2048M --alternator-port 8000 --alternator-write-isolation always_use_lwt"

spark-master:
Expand Down Expand Up @@ -54,6 +66,7 @@ services:
volumes:
- ./migrator/target/scala-2.11:/jars
- ./tests/src/test/configurations:/app/configurations
- ./tests/docker/spark-master:/app/savepoints
# Workaround for https://github.com/awslabs/emr-dynamodb-connector/issues/50
- ${PWD}/tests/docker/job-flow.json:/mnt/var/lib/info/job-flow.json

Expand Down
3 changes: 3 additions & 0 deletions tests/docker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
cassandra/
scylla/
spark-master/
43 changes: 43 additions & 0 deletions tests/src/test/configurations/cassandra-to-scylla-basic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
source:
type: cassandra
host: cassandra
port: 9042
localDC: datacenter1
credentials:
username: dummy
password: dummy
keyspace: test
table: basictest
consistencyLevel: LOCAL_QUORUM
preserveTimestamps: true
splitCount: 8
connections: 8
fetchSize: 1000

target:
type: scylla
host: scylla
port: 9042
localDC: datacenter1
credentials:
username: dummy
password: dummy
keyspace: test
table: basictest
consistencyLevel: LOCAL_QUORUM
connections: 16
stripTrailingZerosForDecimals: false

renames: []

savepoints:
path: /app/savepoints
intervalSeconds: 300
skipTokenRanges: []
validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ trait MigratorSuite extends munit.FunSuite {
}
} catch {
case any: Throwable =>
fail(s"Failed to created table ${name} in database ${sourceDDb}: ${any}")
fail(s"Failed to created table ${name} in database ${sourceDDb}", any)
}
name
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.scylladb.migrator.scylla

import com.datastax.oss.driver.api.querybuilder.QueryBuilder

class BasicMigrationTest extends MigratorSuite {

withTable("BasicTest").test("Read from source and write to target") { tableName =>
val insertStatement =
QueryBuilder
.insertInto(keyspace, tableName)
.json("""{ "id": "12345", "foo": "bar" }""")
.build()

// Insert some items
sourceCassandra.execute(insertStatement)

// Perform the migration
submitSparkJob("cassandra-to-scylla-basic.yaml")

// Check that the schema has been replicated to the target table
val selectQuery =
QueryBuilder.selectFrom(keyspace, tableName).all().build()
val sourceColumns =
sourceCassandra.execute(selectQuery).getColumnDefinitions
val targetColumns =
targetScylla.execute(selectQuery).getColumnDefinitions
assertEquals(targetColumns, sourceColumns)

// Check that the items have been migrated to the target table
// TODO
}

}
137 changes: 137 additions & 0 deletions tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.scylladb.migrator.scylla

import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.`type`.DataTypes
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder

import java.net.InetSocketAddress
import scala.sys.process.Process
import scala.jdk.CollectionConverters._
import scala.util.chaining._

/**
* Base class for implementing end-to-end tests.
*
* It expects external services (Cassandra, Scylla, Spark, etc.) to be running.
* See the files `CONTRIBUTING.md` and `docker-compose-tests.yml` for more information.
*/
trait MigratorSuite extends munit.FunSuite {

val keyspace = "test"

/** Client of a source Cassandra instance */
val sourceCassandra: CqlSession = CqlSession
.builder()
.addContactPoint(new InetSocketAddress("localhost", 9043))
.withLocalDatacenter("datacenter1")
.withAuthCredentials("dummy", "dummy")
.build()

/** Client of a target ScyllaDB instance */
val targetScylla: CqlSession = CqlSession
.builder()
.addContactPoint(new InetSocketAddress("localhost", 9042))
.withLocalDatacenter("datacenter1")
.withAuthCredentials("dummy", "dummy")
.build()

/**
* Fixture automating the house-keeping work when migrating a table.
*
* It deletes the table from both the source and target databases in case it was already
* existing, and then recreates it in the source database.
*
* After the test is executed, it deletes the table from both the source and target
* databases.
*
* @param name Name of the table
*/
def withTable(name: String): FunFixture[String] = FunFixture(
setup = { _ =>
def deleteTableIfExists(database: CqlSession): Unit =
try {
val query = SchemaBuilder.dropTable(keyspace, name).ifExists().build()
database
.execute(query)
.ensuring(_.wasApplied())
} catch {
case any: Throwable =>
fail(s"Something did not work as expected", any)
}
// Make sure the target database does not contain the table already
deleteTableIfExists(sourceCassandra)
deleteTableIfExists(targetScylla)
try {
// Create the table in the source database
val createTableRequest =
SchemaBuilder
.createTable(keyspace, name)
.withPartitionKey("id", DataTypes.TEXT)
.withColumn("foo", DataTypes.TEXT)
.build()
sourceCassandra.execute(createTableRequest)
} catch {
case any: Throwable =>
fail(s"Failed to created table ${name} in database ${sourceCassandra}: ${any}")
}
name
},
teardown = { _ =>
// Clean-up both the source and target databases because we assume the test did replicate the table
// to the target database
val dropTableQuery = SchemaBuilder.dropTable(keyspace, name).build()
targetScylla.execute(dropTableQuery)
sourceCassandra.execute(dropTableQuery)
()
}
)

/**
* Run a migration by submitting a Spark job to the Spark cluster.
* @param migratorConfigFile Configuration file to use. Write your
* configuration files in the directory
* `src/test/configurations`, which is
* automatically mounted to the Spark
* cluster by Docker Compose.
*/
def submitSparkJob(migratorConfigFile: String): Unit = {
Process(
Seq(
"docker",
"compose",
"-f", "docker-compose-tests.yml",
"exec",
"spark-master",
"/spark/bin/spark-submit",
"--class", "com.scylladb.migrator.Migrator",
"--master", "spark://spark-master:7077",
"--conf", "spark.driver.host=spark-master",
"--conf", s"spark.scylla.config=/app/configurations/${migratorConfigFile}",
// Uncomment one of the following lines to plug a remote debugger on the Spark master or worker.
// "--conf", "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005",
// "--conf", "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006",
"/jars/scylla-migrator-assembly-0.0.1.jar"
)
).run().exitValue().tap { statusCode =>
assertEquals(statusCode, 0, "Spark job failed")
}
()
}

override def beforeAll(): Unit = {
val keyspaceStatement =
SchemaBuilder
.createKeyspace(keyspace)
.ifNotExists()
.withReplicationOptions(Map[String, AnyRef]("class" -> "SimpleStrategy", "replication_factor" -> new Integer(1)).asJava)
.build()
sourceCassandra.execute(keyspaceStatement)
targetScylla.execute(keyspaceStatement)
}

override def afterAll(): Unit = {
sourceCassandra.close()
targetScylla.close()
}

}

0 comments on commit 22ca7a4

Please sign in to comment.