Skip to content

Commit

Permalink
Add tests that access AWS
Browse files Browse the repository at this point in the history
- Test authentication via AssumeRole
- Test `streamChanges: true` both with and without `skipInitialSnapshotTransfer`
  • Loading branch information
julienrf committed Aug 26, 2024
1 parent b9be9fb commit e2cce42
Show file tree
Hide file tree
Showing 25 changed files with 562 additions and 102 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/tests-aws.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: Tests with AWS
on:
push:
branches:
- master
paths:
- '**.scala'
- '**.sbt'
workflow_dispatch:

jobs:
test:
name: Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Cache Docker images
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ hashFiles('docker-compose-tests.yml') }}
- uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 8
cache: sbt
- name: Build migrator
run: ./build.sh
- name: Set up services
run: |
docker compose -f docker-compose-tests.yml up -d scylla spark-master spark-worker
.github/wait-for-port.sh 8000 # ScyllaDB Alternator
.github/wait-for-cql.sh scylla
.github/wait-for-port.sh 8080 # Spark master
.github/wait-for-port.sh 8081 # Spark worker
- name: Run tests accessing AWS
run: sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS"
env:
TEST_ON_AWS: true
AWS_ACCESS_KEY: ${{ secrets.AWS_ACCESS_KEY }}
AWS_SECRET_KEY: ${{ secrets.AWS_SECRET_KEY }}
AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }}
- name: Stop services
run: docker compose -f docker-compose-tests.yml down
10 changes: 8 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ on:
push:
branches:
- master
paths:
- '**.scala'
- '**.sbt'
pull_request:
paths:
- '**.scala'
- '**.sbt'

jobs:
test:
Expand Down Expand Up @@ -33,7 +39,7 @@ jobs:
.github/wait-for-cql.sh scylla-source
.github/wait-for-port.sh 8080 # Spark master
.github/wait-for-port.sh 8081 # Spark worker
- name: Run tests
run: sbt test
- name: Run tests locally
run: sbt "testOnly -- --exclude-categories=com.scylladb.migrator.AWS"
- name: Stop services
run: docker compose -f docker-compose-tests.yml down
8 changes: 7 additions & 1 deletion .github/workflows/tutorial-dynamodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ on:
push:
branches:
- master
paths:
- '**.scala'
- '**.sbt'
pull_request:
paths:
- '**.scala'
- '**.sbt'

env:
TUTORIAL_DIR: docs/source/tutorials/dynamodb-to-scylladb-alternator
Expand All @@ -17,7 +23,7 @@ jobs:
- name: Cache Docker images
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ hashFiles('docker-compose-tests.yml') }}
key: docker-${{ runner.os }}-${{ hashFiles('docs/source/tutorials/dynamodb-to-scylladb-alternator/docker-compose.yaml') }}
- uses: actions/setup-java@v4
with:
distribution: temurin
Expand Down
13 changes: 11 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ Tests are implemented in the `tests` sbt submodule. They simulate the submission
docker compose -f docker-compose-tests.yml up
~~~

3. Run the tests
3. Run the tests locally

~~~ sh
sbt test
sbt "testOnly -- --exclude-categories=com.scylladb.migrator.AWS"
~~~

Or, to run a single test:
Expand All @@ -28,6 +28,15 @@ Tests are implemented in the `tests` sbt submodule. They simulate the submission
sbt testOnly com.scylladb.migrator.BasicMigrationTest
~~~

Or, to run the tests that access AWS:

~~~ sh
AWS_ACCESS_KEY=... \
AWS_SECRET_KEY=... \
AWS_ROLE_NAME=... \
sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS"
~~~

4. Ultimately, stop the Docker containers

~~~ sh
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ lazy val tests = project.in(file("tests")).settings(
"org.apache.cassandra" % "java-driver-query-builder" % "4.18.0",
"com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.scalameta" %% "munit" % "0.7.29"
"org.scalameta" %% "munit" % "1.0.1"
),
Test / parallelExecution := false,
// Needed to build a Spark session on Java 17+, see https://stackoverflow.com/questions/73465937/apache-spark-3-3-0-breaks-on-java-17-with-cannot-access-class-sun-nio-ch-direct
Expand Down
3 changes: 2 additions & 1 deletion migrator/src/main/scala/com/scylladb/migrator/Migrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ object Migrator {
val log = LogManager.getLogger("com.scylladb.migrator")

def main(args: Array[String]): Unit = {
implicit val spark = SparkSession
implicit val spark: SparkSession = SparkSession
.builder()
.appName("scylla-migrator")
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()

Logger.getRootLogger.setLevel(Level.WARN)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
source:
type: dynamodb
table: AssumeRoleTest
region: us-west-1
credentials:
accessKey: "{AWS_ACCESS_KEY}"
secretKey: "{AWS_SECRET_KEY}"
assumeRole:
arn: "{AWS_ROLE_ARN}"

target:
type: dynamodb
table: AssumeRoleTest
region: dummy
endpoint:
host: http://scylla
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
streamChanges: false

savepoints:
path: /app/savepoints
intervalSeconds: 300
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
source:
type: dynamodb
table: StreamedItemsSkipSnapshotTest
region: us-west-1
credentials:
accessKey: "{AWS_ACCESS_KEY}"
secretKey: "{AWS_SECRET_KEY}"
assumeRole:
arn: "{AWS_ROLE_ARN}"

target:
type: dynamodb
table: StreamedItemsSkipSnapshotTest
region: dummy
endpoint:
host: http://scylla
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
streamChanges: true
skipInitialSnapshotTransfer: true

savepoints:
path: /app/savepoints
intervalSeconds: 300
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
source:
type: dynamodb
table: StreamedItemsTest
region: us-west-1
credentials:
accessKey: "{AWS_ACCESS_KEY}"
secretKey: "{AWS_SECRET_KEY}"
assumeRole:
arn: "{AWS_ROLE_ARN}"

target:
type: dynamodb
table: StreamedItemsTest
region: dummy
endpoint:
host: http://scylla
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
streamChanges: true

savepoints:
path: /app/savepoints
intervalSeconds: 300
4 changes: 4 additions & 0 deletions tests/src/test/scala/com/scylladb/migrator/AWS.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.scylladb.migrator

/** Used for tagging test suites that require AWS access */
class AWS extends munit.Tag("AWS")
17 changes: 10 additions & 7 deletions tests/src/test/scala/com/scylladb/migrator/SparkUtils.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.scylladb.migrator

import scala.sys.process.Process
import scala.sys.process.{ Process, ProcessBuilder }

object SparkUtils {

Expand All @@ -21,12 +21,15 @@ object SparkUtils {
()
}

/**
* @param migratorConfigFile Configuration file to use
* @param entryPoint Java entry point of the job
* @return The running process
*/
def submitSparkJob(migratorConfigFile: String, entryPoint: String): Process =
submitSparkJobProcess(migratorConfigFile, entryPoint).run()

/**
* @param migratorConfigFile Configuration file to use
* @param entryPoint Java entry point of the job
* @return The running process
*/
def submitSparkJobProcess(migratorConfigFile: String, entryPoint: String): ProcessBuilder =
Process(
Seq(
"docker",
Expand All @@ -49,6 +52,6 @@ object SparkUtils {
// "--conf", "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5006",
"/jars/scylla-migrator-assembly.jar"
)
).run()
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.SparkUtils.successfullyPerformMigration
import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRequest}

import scala.jdk.CollectionConverters._

/** Basic migration that uses the real AWS DynamoDB as a source and AssumeRole for authentication */
class AssumeRoleTest extends MigratorSuiteWithAWS {

withTable("AssumeRoleTest").test("Read from source and write to target") { tableName =>
val configFileName = "dynamodb-to-alternator-assume-role.yaml"

setupConfigurationFile(configFileName)

// Insert some items
val keys = Map("id" -> AttributeValue.fromS("12345"))
val attrs = Map("foo" -> AttributeValue.fromS("bar"))
val itemData = keys ++ attrs
sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build())

// Perform the migration
successfullyPerformMigration(configFileName)

checkSchemaWasMigrated(tableName)

checkItemWasMigrated(tableName, keys, itemData)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRe

import scala.jdk.CollectionConverters._

class BasicMigrationTest extends MigratorSuite {
class BasicMigrationTest extends MigratorSuiteWithDynamoDBLocal {

withTable("BasicTest").test("Read from source and write to target") { tableName =>
val keys = Map("id" -> AttributeValue.fromS("12345"))
val attrs = Map("foo" -> AttributeValue.fromS("bar"))
val itemData = keys ++ attrs

// Insert some items
sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build())
sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build())

// Perform the migration
successfullyPerformMigration("dynamodb-to-alternator-basic.yaml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.nio.file.{Files, Path, Paths}
import java.util.function.Consumer
import scala.jdk.CollectionConverters._

class DynamoDBS3ExportMigrationTest extends MigratorSuite {
class DynamoDBS3ExportMigrationTest extends MigratorSuiteWithDynamoDBLocal {

val s3Client: S3Client =
S3Client
Expand Down Expand Up @@ -90,13 +90,13 @@ class DynamoDBS3ExportMigrationTest extends MigratorSuite {
// Make sure to properly set up and clean up the target database and the S3 instance
def withResources(bucketName: String, tableName: String): FunFixture[(String, String)] = FunFixture(
setup = _ => {
deleteTableIfExists(targetAlternator, tableName)
deleteTableIfExists(targetAlternator(), tableName)
deleteBucket(bucketName)
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build())
(bucketName, tableName)
},
teardown = _ => {
targetAlternator.deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
targetAlternator().deleteTable(DeleteTableRequest.builder().tableName(tableName).build())
deleteBucket(bucketName)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRe
import scala.jdk.CollectionConverters._

// Reproduction of https://github.com/scylladb/scylla-migrator/issues/103
class Issue103Test extends MigratorSuite {
class Issue103Test extends MigratorSuiteWithDynamoDBLocal {

withTable("Issue103Items").test("Issue #103 is fixed") { tableName =>
// Insert two items
Expand All @@ -27,8 +27,8 @@ class Issue103Test extends MigratorSuite {
)
val item2Data = keys2 ++ attrs2

sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build())
sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build())
sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build())
sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build())

// Perform the migration
successfullyPerformMigration("dynamodb-to-alternator-issue-103.yaml")
Expand Down
Loading

0 comments on commit e2cce42

Please sign in to comment.