Skip to content

Commit 00412dc

Browse files
committed
Merge branch 'julienrf-aws-credentials'
2 parents 5f6abb2 + 7858b25 commit 00412dc

25 files changed

+592
-355
lines changed

.github/wait-for-port.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/usr/bin/env bash
2+
3+
port=$1
4+
echo "Waiting for port ${port}"
5+
attempts=0
6+
max_attempts=30
7+
while ! nc -z 127.0.0.1 $port && [[ $attempts < $max_attempts ]] ; do
8+
attempts=$((attempts+1))
9+
sleep 1;
10+
echo "waiting... (${attempts}/${max_attempts})"
11+
done

.github/workflows/tests.yml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ name: Tests
22
on:
33
push:
44
branches:
5-
- master
6-
- actions-test
5+
- master
76
pull_request:
87

98
jobs:
@@ -24,7 +23,12 @@ jobs:
2423
- name: Build migrator
2524
run: ./build.sh
2625
- name: Set up services
27-
run: docker compose -f docker-compose-tests.yml up --wait
26+
run: |
27+
docker compose -f docker-compose-tests.yml up -d
28+
.github/wait-for-port.sh 8000 # ScyllaDB
29+
.github/wait-for-port.sh 8001 # DynamoDB
30+
.github/wait-for-port.sh 8080 # Spark master
31+
.github/wait-for-port.sh 8081 # Spark worker
2832
- name: Run tests
2933
run: sbt test
3034
- name: Stop services

.gitmodules

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
path = spark-cassandra-connector
33
url = https://github.com/scylladb/spark-cassandra-connector
44
branch = feature/track-token-ranges
5-
[submodule "spark-dynamodb"]
6-
path = spark-dynamodb
7-
url = https://github.com/scylladb/spark-dynamodb
85
[submodule "spark-kinesis"]
96
path = spark-kinesis
107
url = https://github.com/scylladb/spark-kinesis

.travis.yml

Lines changed: 0 additions & 15 deletions
This file was deleted.

CONTRIBUTING.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,38 @@ sbt migrator/assembly
4141
~~~
4242

4343
And then re-run the tests.
44+
45+
## Debugging
46+
47+
The tests involve the execution of code on several locations:
48+
- locally (ie, on the machine where you invoke `sbt test`): tests initialization and assertions
49+
- on the Spark master node: the `Migrator` entry point
50+
- on the Spark worker node: RDD operations
51+
52+
In all those cases, it is possible to debug them by using the Java Debug Wire Protocol.
53+
54+
### Local Debugging
55+
56+
Follow the procedure documented [here](https://stackoverflow.com/a/15505308/561721).
57+
58+
### Debugging on the Spark Master Node
59+
60+
1. In the file `MigratorSuite.scala`, uncomment the line that sets the
61+
`spark.driver.extraJavaOptions`.
62+
2. Set up the remote debugger of your IDE to listen to the port 5005.
63+
3. Run a test
64+
4. When the test starts a Spark job, it waits for the remote debugger
65+
5. Start the remote debugger from your IDE.
66+
6. The test execution resumes, and you can interact with it from your debugger.
67+
68+
### Debugging on the Spark Worker Node
69+
70+
1. In the file `MigratorSuite.scala`, uncomment the line that sets the
71+
`spark.executor.extraJavaOptions`.
72+
2. Set up the remote debugger of your IDE to listen to the port 5006.
73+
3. Run a test
74+
4. When the test starts an RDD operation, it waits for the remote debugger.
75+
Note that the Spark master node will not display the output of the worker node,
76+
but you can see it in the worker web UI: http://localhost:8081/.
77+
5. Start the remote debugger from your IDE.
78+
6. The test execution resumes, and you can interact with it from your debugger.

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ lazy val migrator = (project in file("migrator")).settings(
3030
"com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion,
3131
("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2")
3232
.excludeAll(InclExclRule("com.fasterxml.jackson.core")),
33+
"com.amazon.emr" % "emr-dynamodb-hadoop" % "4.16.0",
3334
"org.yaml" % "snakeyaml" % "1.23",
3435
"io.circe" %% "circe-yaml" % "0.9.0",
3536
"io.circe" %% "circe-generic" % "0.9.0",

build.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@ trap "rm -rf $TMPDIR" EXIT
1414
pushd spark-cassandra-connector
1515
sbt -Djava.io.tmpdir="$TMPDIR" ++2.11.12 assembly
1616
popd
17-
pushd spark-dynamodb
18-
sbt assembly
19-
popd
2017
pushd spark-kinesis
2118
sbt assembly
2219
popd
@@ -26,7 +23,6 @@ if [ ! -d "./migrator/lib" ]; then
2623
fi
2724

2825
cp ./spark-cassandra-connector/connector/target/scala-2.11/spark-cassandra-connector-assembly-*.jar ./migrator/lib
29-
cp ./spark-dynamodb/target/scala-2.11/spark-dynamodb-assembly-*.jar ./migrator/lib
3026
cp ./spark-kinesis/target/scala-2.11/spark-streaming-kinesis-asl-assembly-*.jar ./migrator/lib
3127

3228
sbt -Djava.io.tmpdir="$TMPDIR" migrator/assembly

docker-compose-tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ services:
3636
networks:
3737
- scylla
3838
expose:
39+
- 5005
3940
- 7001
4041
- 7002
4142
- 7003
@@ -46,12 +47,15 @@ services:
4647
- 6066
4748
ports:
4849
- 4040:4040
50+
- 5005:5005
4951
- 6066:6066
5052
- 7077:7077
5153
- 8080:8080
5254
volumes:
5355
- ./migrator/target/scala-2.11:/jars
5456
- ./tests/src/test/configurations:/app/configurations
57+
# Workaround for https://github.com/awslabs/emr-dynamodb-connector/issues/50
58+
- ${PWD}/tests/docker/job-flow.json:/mnt/var/lib/info/job-flow.json
5559

5660
spark-worker:
5761
image: bde2020/spark-worker:2.4.4-hadoop2.7
@@ -65,13 +69,15 @@ services:
6569
networks:
6670
- scylla
6771
expose:
72+
- 5006
6873
- 7012
6974
- 7013
7075
- 7014
7176
- 7015
7277
- 7016
7378
- 8881
7479
ports:
80+
- 5006:5006
7581
- 8081:8081
7682
depends_on:
7783
- spark-master

migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import com.amazonaws.auth.AWSCredentialsProvider
44
import com.amazonaws.client.builder.AwsClientBuilder
55
import com.amazonaws.services.dynamodbv2.{
66
AmazonDynamoDBClientBuilder,
7-
AmazonDynamoDBStreamsClient,
87
AmazonDynamoDBStreamsClientBuilder
98
}
109
import com.amazonaws.services.dynamodbv2.model.{
@@ -17,7 +16,14 @@ import com.amazonaws.services.dynamodbv2.model.{
1716
TableDescription,
1817
UpdateTableRequest
1918
}
20-
import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings, TargetSettings }
19+
import com.scylladb.migrator.config.{
20+
AWSCredentials,
21+
DynamoDBEndpoint,
22+
SourceSettings,
23+
TargetSettings
24+
}
25+
import org.apache.hadoop.dynamodb.DynamoDBConstants
26+
import org.apache.hadoop.mapred.JobConf
2127
import org.apache.log4j.LogManager
2228

2329
import scala.util.{ Failure, Success, Try }
@@ -64,7 +70,8 @@ object DynamoUtils {
6470

6571
def enableDynamoStream(source: SourceSettings.DynamoDB): Unit = {
6672
val sourceClient = buildDynamoClient(source.endpoint, source.credentials, source.region)
67-
val sourceStreamsClient = buildDynamoStreamsClient(source.credentials, source.region)
73+
val sourceStreamsClient =
74+
buildDynamoStreamsClient(source.endpoint, source.credentials, source.region)
6875

6976
sourceClient
7077
.updateTable(
@@ -114,12 +121,67 @@ object DynamoUtils {
114121
builder.build()
115122
}
116123

117-
def buildDynamoStreamsClient(creds: Option[AWSCredentialsProvider], region: Option[String]) = {
124+
def buildDynamoStreamsClient(endpoint: Option[DynamoDBEndpoint],
125+
creds: Option[AWSCredentialsProvider],
126+
region: Option[String]) = {
118127
val builder = AmazonDynamoDBStreamsClientBuilder.standard()
119128

129+
endpoint.foreach { endpoint =>
130+
builder
131+
.withEndpointConfiguration(
132+
new AwsClientBuilder.EndpointConfiguration(
133+
endpoint.renderEndpoint,
134+
region.getOrElse("empty")))
135+
}
120136
creds.foreach(builder.withCredentials)
121137
region.foreach(builder.withRegion)
122138

123139
builder.build()
124140
}
141+
142+
/**
143+
* Optionally set a configuration. If `maybeValue` is empty, nothing is done. Otherwise,
144+
* its value is set to the `name` property on the `jobConf`.
145+
*
146+
* @param jobConf Target JobConf to configure
147+
* @param name Name of the Hadoop configuration key
148+
* @param maybeValue Optional value to set.
149+
*/
150+
def setOptionalConf(jobConf: JobConf, name: String, maybeValue: Option[String]): Unit =
151+
for (value <- maybeValue) {
152+
jobConf.set(name, value)
153+
}
154+
155+
/**
156+
* Set the common configuration of both read and write DynamoDB jobs.
157+
* @param jobConf Target JobConf to configure
158+
* @param maybeRegion AWS region
159+
* @param maybeEndpoint AWS endpoint
160+
* @param maybeScanSegments Scan segments
161+
* @param maybeMaxMapTasks Max map tasks
162+
* @param maybeAwsCredentials AWS credentials
163+
*/
164+
def setDynamoDBJobConf(jobConf: JobConf,
165+
maybeRegion: Option[String],
166+
maybeEndpoint: Option[DynamoDBEndpoint],
167+
maybeScanSegments: Option[Int],
168+
maybeMaxMapTasks: Option[Int],
169+
maybeAwsCredentials: Option[AWSCredentials]): Unit = {
170+
setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion)
171+
setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint))
172+
setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString))
173+
setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString))
174+
for (credentials <- maybeAwsCredentials) {
175+
jobConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, credentials.accessKey)
176+
jobConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, credentials.secretKey)
177+
}
178+
jobConf.set(
179+
DynamoDBConstants.CUSTOM_CREDENTIALS_PROVIDER_CONF,
180+
"com.amazonaws.auth.profile.ProfileCredentialsProvider")
181+
jobConf.set(
182+
"mapred.output.format.class",
183+
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
184+
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
185+
}
186+
125187
}

0 commit comments

Comments
 (0)