diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e66dbc13..3bc7ccc1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -25,8 +25,10 @@ jobs: - name: Set up services run: | docker compose -f docker-compose-tests.yml up -d - .github/wait-for-port.sh 8000 # ScyllaDB + .github/wait-for-port.sh 9042 # ScyllaDB + .github/wait-for-port.sh 8000 # ScyllaDB Alternator .github/wait-for-port.sh 8001 # DynamoDB + .github/wait-for-port.sh 9043 # Cassandra .github/wait-for-port.sh 8080 # Spark master .github/wait-for-port.sh 8081 # Spark worker - name: Run tests diff --git a/build.sbt b/build.sbt index d978aa3c..313c239f 100644 --- a/build.sbt +++ b/build.sbt @@ -3,12 +3,15 @@ import sbt.librarymanagement.InclExclRule val awsSdkVersion = "1.11.728" val sparkVersion = "2.4.4" +inThisBuild( + List( + organization := "com.scylladb", + scalaVersion := "2.11.12", + scalacOptions += "-target:jvm-1.8" + ) +) + lazy val migrator = (project in file("migrator")).settings( - inThisBuild( - List( - organization := "com.scylladb", - scalaVersion := "2.11.12" - )), name := "scylla-migrator", version := "0.0.1", mainClass := Some("com.scylladb.migrator.Migrator"), @@ -72,6 +75,7 @@ lazy val migrator = (project in file("migrator")).settings( lazy val tests = project.in(file("tests")).settings( libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, + "org.apache.cassandra" % "java-driver-query-builder" % "4.18.0", "org.scalameta" %% "munit" % "0.7.29", "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" ), diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index 5a1808c4..d55f3c62 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -14,6 +14,17 @@ services: - "8001:8000" working_dir: /home/dynamodblocal + cassandra: + image: cassandra:latest + networks: + - scylla + volumes: + - ./tests/docker/cassandra:/var/lib/cassandra + ports: + - "9043:9042" + expose: + - 9043 + scylla: image: scylladb/scylla:latest networks: @@ -22,6 +33,7 @@ services: - "./tests/docker/scylla:/var/lib/scylla" ports: - "8000:8000" + - "9042:9042" command: "--smp 2 --memory 2048M --alternator-port 8000 --alternator-write-isolation always_use_lwt" spark-master: @@ -54,6 +66,7 @@ services: volumes: - ./migrator/target/scala-2.11:/jars - ./tests/src/test/configurations:/app/configurations + - ./tests/docker/spark-master:/app/savepoints # Workaround for https://github.com/awslabs/emr-dynamodb-connector/issues/50 - ${PWD}/tests/docker/job-flow.json:/mnt/var/lib/info/job-flow.json diff --git a/tests/docker/.gitignore b/tests/docker/.gitignore new file mode 100644 index 00000000..7deefc53 --- /dev/null +++ b/tests/docker/.gitignore @@ -0,0 +1,3 @@ +cassandra/ +scylla/ +spark-master/ diff --git a/tests/src/test/configurations/cassandra-to-scylla-basic.yaml b/tests/src/test/configurations/cassandra-to-scylla-basic.yaml new file mode 100644 index 00000000..b046cac7 --- /dev/null +++ b/tests/src/test/configurations/cassandra-to-scylla-basic.yaml @@ -0,0 +1,43 @@ +source: + type: cassandra + host: cassandra + port: 9042 + localDC: datacenter1 + credentials: + username: dummy + password: dummy + keyspace: test + table: basictest + consistencyLevel: LOCAL_QUORUM + preserveTimestamps: true + splitCount: 8 + connections: 8 + fetchSize: 1000 + +target: + type: scylla + host: scylla + port: 9042 + localDC: datacenter1 + credentials: + username: dummy + password: dummy + keyspace: test + table: basictest + consistencyLevel: LOCAL_QUORUM + connections: 16 + stripTrailingZerosForDecimals: false + +renames: [] + +savepoints: + path: /app/savepoints + intervalSeconds: 300 +skipTokenRanges: [] +validation: + compareTimestamps: true + ttlToleranceMillis: 60000 + writetimeToleranceMillis: 1000 + failuresToFetch: 100 + floatingPointTolerance: 0.001 + timestampMsTolerance: 0 diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf new file mode 100644 index 00000000..53ce78e4 --- /dev/null +++ b/tests/src/test/resources/application.conf @@ -0,0 +1,5 @@ +datastax-java-driver { + basic.request { + timeout = 10 seconds + } +} diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala index a74a960c..107586be 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala @@ -79,7 +79,7 @@ trait MigratorSuite extends munit.FunSuite { } } catch { case any: Throwable => - fail(s"Failed to created table ${name} in database ${sourceDDb}: ${any}") + fail(s"Failed to created table ${name} in database ${sourceDDb}", any) } name }, diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/BasicMigrationTest.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/BasicMigrationTest.scala new file mode 100644 index 00000000..9f7c181d --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/BasicMigrationTest.scala @@ -0,0 +1,43 @@ +package com.scylladb.migrator.scylla + +import com.datastax.oss.driver.api.querybuilder.QueryBuilder +import com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal +import com.datastax.oss.driver.api.querybuilder.term.Term + +import scala.jdk.CollectionConverters._ +import scala.util.chaining._ + +class BasicMigrationTest extends MigratorSuite { + + withTable("BasicTest").test("Read from source and write to target") { tableName => + val insertStatement = + QueryBuilder + .insertInto(keyspace, tableName) + .values(Map[String, Term]( + "id" -> literal("12345"), + "foo" -> literal("bar") + ).asJava) + .build() + + // Insert some items + sourceCassandra.execute(insertStatement) + + // Perform the migration + submitSparkJob("cassandra-to-scylla-basic.yaml") + + // Check that the item has been migrated to the target table + val selectAllStatement = QueryBuilder + .selectFrom(keyspace, tableName) + .all() + .build() + targetScylla.execute(selectAllStatement).tap { resultSet => + val rows = resultSet.all().asScala + assertEquals(rows.size, 1) + val row = rows.head + assertEquals(row.getColumnDefinitions.size(), 2) + assertEquals(row.getString("id"), "12345") + assertEquals(row.getString("foo"), "bar") + } + } + +} diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala new file mode 100644 index 00000000..a497e24f --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala @@ -0,0 +1,136 @@ +package com.scylladb.migrator.scylla + +import com.datastax.oss.driver.api.core.CqlSession +import com.datastax.oss.driver.api.core.`type`.DataTypes +import com.datastax.oss.driver.api.querybuilder.SchemaBuilder + +import java.net.InetSocketAddress +import scala.sys.process.Process +import scala.jdk.CollectionConverters._ +import scala.util.chaining._ + +/** + * Base class for implementing end-to-end tests. + * + * It expects external services (Cassandra, Scylla, Spark, etc.) to be running. + * See the files `CONTRIBUTING.md` and `docker-compose-tests.yml` for more information. + */ +trait MigratorSuite extends munit.FunSuite { + + val keyspace = "test" + + /** Client of a source Cassandra instance */ + val sourceCassandra: CqlSession = CqlSession + .builder() + .addContactPoint(new InetSocketAddress("localhost", 9043)) + .withLocalDatacenter("datacenter1") + .withAuthCredentials("dummy", "dummy") + .build() + + /** Client of a target ScyllaDB instance */ + val targetScylla: CqlSession = CqlSession + .builder() + .addContactPoint(new InetSocketAddress("localhost", 9042)) + .withLocalDatacenter("datacenter1") + .withAuthCredentials("dummy", "dummy") + .build() + + /** + * Fixture automating the house-keeping work when migrating a table. + * + * It deletes the table from both the source and target databases in case it was already + * existing, and then recreates it in the source database. + * + * After the test is executed, it deletes the table from both the source and target + * databases. + * + * @param name Name of the table + */ + def withTable(name: String): FunFixture[String] = FunFixture( + setup = { _ => + def dropAndRecreateTable(database: CqlSession): Unit = + try { + val dropTableStatement = + SchemaBuilder + .dropTable(keyspace, name) + .ifExists() + .build() + database + .execute(dropTableStatement) + .ensuring(_.wasApplied()) + val createTableStatement = + SchemaBuilder + .createTable(keyspace, name) + .withPartitionKey("id", DataTypes.TEXT) + .withColumn("foo", DataTypes.TEXT) + .build() + database + .execute(createTableStatement) + .ensuring(_.wasApplied()) + } catch { + case any: Throwable => + fail(s"Something did not work as expected", any) + } + // Make sure the source and target databases do not contain the table already + dropAndRecreateTable(sourceCassandra) + dropAndRecreateTable(targetScylla) + name + }, + teardown = { _ => + // Clean-up both the source and target databases + val dropTableQuery = SchemaBuilder.dropTable(keyspace, name).build() + targetScylla.execute(dropTableQuery) + sourceCassandra.execute(dropTableQuery) + () + } + ) + + /** + * Run a migration by submitting a Spark job to the Spark cluster. + * @param migratorConfigFile Configuration file to use. Write your + * configuration files in the directory + * `src/test/configurations`, which is + * automatically mounted to the Spark + * cluster by Docker Compose. + */ + def submitSparkJob(migratorConfigFile: String): Unit = { + Process( + Seq( + "docker", + "compose", + "-f", "docker-compose-tests.yml", + "exec", + "spark-master", + "/spark/bin/spark-submit", + "--class", "com.scylladb.migrator.Migrator", + "--master", "spark://spark-master:7077", + "--conf", "spark.driver.host=spark-master", + "--conf", s"spark.scylla.config=/app/configurations/${migratorConfigFile}", + // Uncomment one of the following lines to plug a remote debugger on the Spark master or worker. + // "--conf", "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005", + // "--conf", "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006", + "/jars/scylla-migrator-assembly-0.0.1.jar" + ) + ).run().exitValue().tap { statusCode => + assertEquals(statusCode, 0, "Spark job failed") + } + () + } + + override def beforeAll(): Unit = { + val keyspaceStatement = + SchemaBuilder + .createKeyspace(keyspace) + .ifNotExists() + .withReplicationOptions(Map[String, AnyRef]("class" -> "SimpleStrategy", "replication_factor" -> new Integer(1)).asJava) + .build() + sourceCassandra.execute(keyspaceStatement) + targetScylla.execute(keyspaceStatement) + } + + override def afterAll(): Unit = { + sourceCassandra.close() + targetScylla.close() + } + +}