diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java
index 4d951913..80bc0f47 100644
--- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java
+++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java
@@ -60,11 +60,14 @@ public interface DynamoDBConstants {
String THROUGHPUT_READ_PERCENT = "dynamodb.throughput.read.percent";
String READ_THROUGHPUT = "dynamodb.throughput.read";
String WRITE_THROUGHPUT = "dynamodb.throughput.write";
+ String READ_THROUGHPUT_AUTOSCALING = "dynamodb.throughput.read.autoscaling";
+ String WRITE_THROUGHPUT_AUTOSCALING = "dynamodb.throughput.write.autoscaling";
String AVG_ITEM_SIZE = "dynamodb.item.average.size";
String ITEM_COUNT = "dynamodb.item.count";
String TABLE_SIZE_BYTES = "dynamodb.table.size-bytes";
String MAX_MAP_TASKS = "dynamodb.max.map.tasks";
String DEFAULT_THROUGHPUT_PERCENTAGE = "0.5";
+ String DEFAULT_THROUGHPUT_AUTOSCALING = "true";
String BILLING_MODE_PROVISIONED = BillingMode.PROVISIONED.toString();
String DYNAMODB_MAX_ITEM_SIZE = "dynamodb.max.item.size";
diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java
index 7634d965..95f83b78 100644
--- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java
+++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java
@@ -55,17 +55,24 @@ public ReadIopsCalculator(JobClient jobClient, DynamoDBClient dynamoDBClient, St
}
public long calculateTargetIops() {
- double configuredThroughput = Math.floor(Double.parseDouble(
- jobConf.get(DynamoDBConstants.READ_THROUGHPUT,
- String.valueOf(getThroughput()))) * throughputPercent);
- long throughputPerTask = Math.max((long) (configuredThroughput / totalSegments
+ double configuredThroughput;
+ // Always fetch throughput from DDB if auto-scaling is enabled or not specified
+ if (Boolean.parseBoolean(jobConf.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING))
+ || jobConf.get(DynamoDBConstants.READ_THROUGHPUT) == null) {
+ configuredThroughput = getThroughput();
+ } else {
+ configuredThroughput = Double.parseDouble(jobConf.get(DynamoDBConstants.READ_THROUGHPUT));
+ }
+ double calculatedThroughput = Math.floor(configuredThroughput * throughputPercent);
+
+ long throughputPerTask = Math.max((long) (calculatedThroughput / totalSegments
* localSegments), 1);
log.info("Throughput per task for table " + tableName + " : " + throughputPerTask);
return throughputPerTask;
}
- private double getThroughput() {
+ protected double getThroughput() {
TableDescription tableDescription = dynamoDBClient.describeTable(tableName);
if (tableDescription.billingModeSummary() == null
|| tableDescription.billingModeSummary().billingMode() == BillingMode.PROVISIONED) {
diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java
index df415769..d32685db 100644
--- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java
+++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java
@@ -70,10 +70,16 @@ public WriteIopsCalculator(JobClient jobClient, DynamoDBClient dynamoDBClient, S
}
public long calculateTargetIops() {
- double configuredThroughput = Math.floor(Double.parseDouble(
- jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT, String.valueOf(getThroughput())))
- * throughputPercent);
- long throughputPerTask = Math.max((long) (configuredThroughput / maxParallelTasks), 1);
+ double configuredThroughput;
+ // Always fetch throughput from DDB if auto-scaling is enabled
+ if (Boolean.parseBoolean(jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING))
+ || jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT) == null) {
+ configuredThroughput = getThroughput();
+ } else {
+ configuredThroughput = Double.parseDouble(jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT));
+ }
+ double calculatedThroughput = Math.floor(configuredThroughput * throughputPercent);
+ long throughputPerTask = Math.max((long) (calculatedThroughput / maxParallelTasks), 1);
log.info("Throughput per task for table " + tableName + " : " + throughputPerTask);
return throughputPerTask;
@@ -88,7 +94,7 @@ int calculateMaxMapTasks(int totalMapTasks) {
return totalMapTasks;
}
- private double getThroughput() {
+ protected double getThroughput() {
TableDescription tableDescription = dynamoDBClient.describeTable(tableName);
if (tableDescription.billingModeSummary() == null
|| tableDescription.billingModeSummary().billingMode() == BillingMode.PROVISIONED) {
diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java
index 0c6c035e..790bea9d 100644
--- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java
+++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java
@@ -14,16 +14,20 @@
package org.apache.hadoop.dynamodb.read;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.dynamodb.DynamoDBClient;
import org.apache.hadoop.dynamodb.DynamoDBConstants;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
@@ -43,31 +47,55 @@ public class ReadIopsCalculatorTest {
private ReadIopsCalculator readIopsCalculator;
- @Before
- public void setup() {
+ @Test
+ public void testCalculateTargetIops() {
+ JobConf jobConf = new JobConf();
+ readIopsCalculator = getReadIopsCalculator(jobConf);
+
+ long readIops = readIopsCalculator.calculateTargetIops();
+ long expectedReadIops = (long) (READ_CAPACITY_UNITS * THROUGHPUT_READ_PERCENT *
+ LOCAL_SEGMENTS / TOTAL_SEGMETNS);
+ assertEquals(expectedReadIops, readIops);
+ }
+
+ @Test
+ public void testCalculateIopsAutoscalingEnabled() {
+ JobConf jobConf = new JobConf();
+ jobConf.set(DynamoDBConstants.READ_THROUGHPUT, "500");
+ readIopsCalculator = getReadIopsCalculator(jobConf);
+
+ ReadIopsCalculator spyIopsCalculator = Mockito.spy(readIopsCalculator);
+ doReturn((double) 1000).when(spyIopsCalculator).getThroughput();
+ long readIops = spyIopsCalculator.calculateTargetIops();
+ long expectedReadIops = (long) (500 * THROUGHPUT_READ_PERCENT *
+ LOCAL_SEGMENTS / TOTAL_SEGMETNS);
+ assertEquals(expectedReadIops, readIops);
+ // Autoscaling not enabled, throughput shouldn't be fetched when user specified
+ verify(spyIopsCalculator, times(0)).getThroughput();
+
+ jobConf.set(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING, "true");
+ long readIops2 = spyIopsCalculator.calculateTargetIops();
+ long expectedReadIops2 = (long) (1000 * THROUGHPUT_READ_PERCENT *
+ LOCAL_SEGMENTS / TOTAL_SEGMETNS);
+ assertEquals(expectedReadIops2, readIops2);
+ // Autoscaling enabled, throughput should be fetched regardless of if user specified
+ verify(spyIopsCalculator, times(1)).getThroughput();
+ }
+
+ private ReadIopsCalculator getReadIopsCalculator(JobConf jobConf) {
when(dynamoDBClient.describeTable(TABLE_NAME)).thenReturn(TableDescription.builder()
.billingModeSummary(BillingModeSummary.builder()
- .billingMode(DynamoDBConstants.BILLING_MODE_PROVISIONED)
+ .billingMode(BillingMode.PROVISIONED)
.build())
.provisionedThroughput(ProvisionedThroughputDescription.builder()
.readCapacityUnits(READ_CAPACITY_UNITS)
.build())
.build());
- JobConf jobConf = new JobConf();
jobConf.set(DynamoDBConstants.THROUGHPUT_READ_PERCENT, String.valueOf(THROUGHPUT_READ_PERCENT));
- when(jobClient.getConf()).thenReturn(jobConf);
+ doReturn(jobConf).when(jobClient).getConf();
- readIopsCalculator = new ReadIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME,
+ return new ReadIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME,
TOTAL_SEGMETNS, LOCAL_SEGMENTS);
}
-
- @Test
- public void testCalculateTargetIops() {
- long readIops = readIopsCalculator.calculateTargetIops();
- long expectedReadIops = (long) (READ_CAPACITY_UNITS * THROUGHPUT_READ_PERCENT *
- LOCAL_SEGMENTS / TOTAL_SEGMETNS);
- assertEquals(expectedReadIops, readIops);
- }
-
}
diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java
index 0e7899dc..7f4e0c37 100644
--- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java
+++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java
@@ -14,16 +14,19 @@
package org.apache.hadoop.dynamodb.write;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.dynamodb.DynamoDBClient;
import org.apache.hadoop.dynamodb.DynamoDBConstants;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary;
@@ -45,39 +48,62 @@ public class WriteIopsCalculatorTest {
private WriteIopsCalculator writeIopsCalculator;
- @Before
- public void setup() {
+ @Test
+ public void testCalculateTargetIops() {
+ JobConf jobConf = new JobConf();
+ writeIopsCalculator = getWriteIopsCalculator(jobConf);
+
+ long writeIops = writeIopsCalculator.calculateTargetIops();
+ long expectedWriteIops = (long) (WRITE_CAPACITY_UNITS * THROUGHPUT_WRITE_PERCENT / Math.min
+ (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
+ assertEquals(expectedWriteIops, writeIops);
+ }
+
+ @Test
+ public void testCalculateIopsAutoscalingEnabled() {
+ JobConf jobConf = new JobConf();
+ jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, "500");
+ writeIopsCalculator = getWriteIopsCalculator(jobConf);
+
+ WriteIopsCalculator spyIopsCalculator = Mockito.spy(writeIopsCalculator);
+ doReturn((double) 1000).when(spyIopsCalculator).getThroughput();
+ long writeIops = spyIopsCalculator.calculateTargetIops();
+ long expectedWriteIops = (long) (500 * THROUGHPUT_WRITE_PERCENT / Math.min
+ (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
+ assertEquals(expectedWriteIops, writeIops);
+ // Autoscaling not enabled, throughput shouldn't be fetched when user specified
+ verify(spyIopsCalculator, times(0)).getThroughput();
+
+ jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING, "true");
+ long writeIops2 = spyIopsCalculator.calculateTargetIops();
+ long expectedWriteIops2 = (long) (1000 * THROUGHPUT_WRITE_PERCENT / Math.min
+ (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
+ assertEquals(expectedWriteIops2, writeIops2);
+ // Autoscaling enabled, throughput should be fetched regardless of if user specified
+ verify(spyIopsCalculator, times(1)).getThroughput();
+ }
+
+ private WriteIopsCalculator getWriteIopsCalculator(JobConf jobConf) {
when(dynamoDBClient.describeTable(TABLE_NAME)).thenReturn(TableDescription.builder()
- .billingModeSummary(
- BillingModeSummary.builder()
+ .billingModeSummary(BillingModeSummary.builder()
.billingMode(BillingMode.PROVISIONED)
.build())
- .provisionedThroughput(
- ProvisionedThroughputDescription.builder()
- .writeCapacityUnits(WRITE_CAPACITY_UNITS)
- .build())
+ .provisionedThroughput(ProvisionedThroughputDescription.builder()
+ .writeCapacityUnits(WRITE_CAPACITY_UNITS)
+ .build())
.build());
- JobConf jobConf = new JobConf();
jobConf.setNumMapTasks(TOTAL_MAP_TASKS);
jobConf.set("mapreduce.task.attempt.id", "attempt_m_1");
- jobConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, String.valueOf
- (THROUGHPUT_WRITE_PERCENT));
+ jobConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT,
+ String.valueOf(THROUGHPUT_WRITE_PERCENT));
when(jobClient.getConf()).thenReturn(jobConf);
- writeIopsCalculator = new WriteIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME) {
+ return new WriteIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME) {
@Override
int calculateMaxMapTasks(int totalMapTasks) {
return MAX_CONCURRENT_MAP_TASKS;
}
};
}
-
- @Test
- public void testCalculateTargetIops() {
- long writeIops = writeIopsCalculator.calculateTargetIops();
- long expectedWriteIops = (long) (WRITE_CAPACITY_UNITS * THROUGHPUT_WRITE_PERCENT / Math.min
- (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
- assertEquals(expectedWriteIops, writeIops);
- }
}
diff --git a/emr-dynamodb-hive/pom.xml b/emr-dynamodb-hive/pom.xml
index 83240732..e74c75c1 100644
--- a/emr-dynamodb-hive/pom.xml
+++ b/emr-dynamodb-hive/pom.xml
@@ -112,6 +112,12 @@
test
+
+ org.mockito
+ mockito-core
+ test
+
+
diff --git a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java
index fb295546..e780f03c 100644
--- a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java
+++ b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java
@@ -119,8 +119,7 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese
@Override
public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) {
- DynamoDBClient client =
- new DynamoDBClient(conf, tableDesc.getProperties().getProperty(DynamoDBConstants.REGION));
+ DynamoDBClient client = createDynamoDBClient(tableDesc);
try {
String tableName = HiveDynamoDBUtil.getDynamoDBTableName(tableDesc.getProperties()
@@ -191,7 +190,22 @@ public void configureTableJobProperties(TableDesc tableDesc, Map
if (description.billingModeSummary() == null
|| description.billingModeSummary().billingMode() == BillingMode.PROVISIONED) {
- useExplicitThroughputIfRequired(jobProperties, tableDesc);
+ // If not specified at the table level, get initial read/write capacity from DDB
+ jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties()
+ .getProperty(DynamoDBConstants.READ_THROUGHPUT,
+ description.provisionedThroughput().readCapacityUnits().toString()));
+ jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, tableDesc.getProperties()
+ .getProperty(DynamoDBConstants.WRITE_THROUGHPUT,
+ description.provisionedThroughput().writeCapacityUnits().toString()));
+ // Assume auto-scaling enabled for PROVISIONED tables if throughput not specified by user
+ if (tableDesc.getProperties().getProperty(DynamoDBConstants.READ_THROUGHPUT) == null) {
+ jobProperties.put(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING,
+ DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
+ }
+ if (tableDesc.getProperties().getProperty(DynamoDBConstants.WRITE_THROUGHPUT) == null) {
+ jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING,
+ DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
+ }
} else {
// If not specified at the table level, set default value
jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties()
@@ -222,21 +236,6 @@ public void configureInputJobCredentials(TableDesc tableDesc, Map jobProperties,
- TableDesc tableDesc) {
- String userRequiredReadThroughput =
- tableDesc.getProperties().getProperty(DynamoDBConstants.READ_THROUGHPUT);
- if (userRequiredReadThroughput != null) {
- jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, userRequiredReadThroughput);
- }
-
- String userRequiredWriteThroughput =
- tableDesc.getProperties().getProperty(DynamoDBConstants.WRITE_THROUGHPUT);
- if (userRequiredWriteThroughput != null) {
- jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, userRequiredWriteThroughput);
- }
- }
-
@Override
public Class extends InputFormat> getInputFormatClass() {
return HiveDynamoDBInputFormat.class;
@@ -375,11 +374,16 @@ void checkTableSchemaType(TableDescription tableDescription, Table table) throws
}
}
- private DynamoDBClient createDynamoDBClient(Table table) {
+ protected DynamoDBClient createDynamoDBClient(Table table) {
String region = table.getParameters().get(DynamoDBConstants.REGION);
return new DynamoDBClient(conf, region);
}
+ protected DynamoDBClient createDynamoDBClient(TableDesc tableDesc) {
+ String region = tableDesc.getProperties().getProperty(DynamoDBConstants.REGION);
+ return new DynamoDBClient(conf, region);
+ }
+
private void checkTableStatus(TableDescription tableDescription) throws MetaException {
String status = tableDescription.tableStatusAsString();
diff --git a/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java b/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java
index 4f50111b..8d650ab2 100644
--- a/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java
+++ b/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java
@@ -13,11 +13,14 @@
package org.apache.hadoop.hive.dynamodb;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dynamodb.DynamoDBClient;
import org.apache.hadoop.dynamodb.DynamoDBConstants;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -26,13 +29,26 @@
import com.google.common.collect.Maps;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
+import org.mockito.Mockito;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+
public class DynamoDBStorageHandlerTest {
@Rule
@@ -42,6 +58,7 @@ public class DynamoDBStorageHandlerTest {
@Before
public void setup() {
storageHandler = new DynamoDBStorageHandler();
+ storageHandler.setConf(new Configuration());
}
@Test
@@ -309,7 +326,87 @@ public void testCheckTableSchemaNullSerializationValid() throws MetaException {
storageHandler.checkTableSchemaType(description, table);
}
+ @Test
+ public void testReadWriteThroughputConfiguredProvisionedTable() {
+ DynamoDBClient mockDynamoClient = Mockito.mock(DynamoDBClient.class);
+ TableDescription mockDescribeTableResponse =
+ getHashRangeTableByBillingMode(BillingMode.PROVISIONED);
+ doReturn(mockDescribeTableResponse).when(mockDynamoClient).describeTable(any());
+
+ DynamoDBStorageHandler spyDynamoStorageHandler = Mockito.spy(storageHandler);
+ doReturn(mockDynamoClient).when(spyDynamoStorageHandler).createDynamoDBClient((TableDesc) any());
+
+ TableDesc testTableDesc = new TableDesc();
+ Properties properties = new Properties();
+ properties.setProperty(DynamoDBConstants.REGION, "us-east-1");
+ properties.setProperty(DynamoDBConstants.TABLE_NAME, "test-table");
+ testTableDesc.setProperties(properties);
+
+ // User has not specified explicit throughput settings, fetch from dynamo table
+ Map jobProperties1 = new HashMap<>();
+ spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties1);
+ assertEquals(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT), "400");
+ assertEquals(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT), "100");
+ // Provisioned tables should be configured to dynamically calculate throughput during tasks
+ assertTrue(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING)));
+ assertTrue(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING)));
+
+ // User has specified throughput settings, override what is in table
+ properties.setProperty(DynamoDBConstants.READ_THROUGHPUT, "50");
+ properties.setProperty(DynamoDBConstants.WRITE_THROUGHPUT, "50");
+
+ Map jobProperties2 = new HashMap<>();
+ spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties2);
+ assertEquals(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT), "50");
+ assertEquals(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT), "50");
+ // Provisioned tables with user specified settings should not dynamically configure throughput
+ assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING)));
+ assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING)));
+ }
+
+ @Test
+ public void testReadWriteThroughputConfiguredOnDemandTable() {
+ DynamoDBClient mockDynamoClient = Mockito.mock(DynamoDBClient.class);
+ TableDescription mockDescribeTableResponse =
+ getHashRangeTableByBillingMode(BillingMode.PAY_PER_REQUEST);
+ doReturn(mockDescribeTableResponse).when(mockDynamoClient).describeTable(any());
+
+ DynamoDBStorageHandler spyDynamoStorageHandler = Mockito.spy(storageHandler);
+ doReturn(mockDynamoClient).when(spyDynamoStorageHandler).createDynamoDBClient((TableDesc) any());
+
+ TableDesc testTableDesc = new TableDesc();
+ Properties properties = new Properties();
+ properties.setProperty(DynamoDBConstants.REGION, "us-east-1");
+ properties.setProperty(DynamoDBConstants.TABLE_NAME, "test-table");
+ testTableDesc.setProperties(properties);
+
+ // User has not specified explicit throughput settings, default for on-demand are set
+ Map jobProperties1 = new HashMap<>();
+ spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties1);
+ assertEquals(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT), DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString());
+ assertEquals(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT), DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString());
+ // On demand tables should never dynamically configure throughput
+ assertFalse(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING)));
+ assertFalse(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING)));
+
+ // User has specified throughput settings
+ properties.setProperty(DynamoDBConstants.READ_THROUGHPUT, "50");
+ properties.setProperty(DynamoDBConstants.WRITE_THROUGHPUT, "50");
+
+ Map jobProperties2 = new HashMap<>();
+ spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties2);
+ assertEquals(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT), "50");
+ assertEquals(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT), "50");
+ assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING)));
+ assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING)));
+ }
+
+
private TableDescription getHashRangeTable() {
+ return getHashRangeTableByBillingMode(BillingMode.PROVISIONED);
+ }
+
+ private TableDescription getHashRangeTableByBillingMode(BillingMode billingMode) {
TableDescription description = TableDescription.builder()
.keySchema(Arrays.asList(
KeySchemaElement.builder().attributeName("hashKey").build(),
@@ -323,6 +420,15 @@ private TableDescription getHashRangeTable() {
.attributeName("rangeKey")
.attributeType(ScalarAttributeType.N)
.build()))
+ .provisionedThroughput(ProvisionedThroughputDescription.builder()
+ .readCapacityUnits(400L)
+ .writeCapacityUnits(100L)
+ .build())
+ .billingModeSummary(BillingModeSummary.builder()
+ .billingMode(billingMode)
+ .build())
+ .itemCount(0L)
+ .tableSizeBytes(0L)
.build();
return description;
}
diff --git a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java
index 8058f090..c971cd61 100644
--- a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java
+++ b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java
@@ -117,6 +117,11 @@ private void setTableProperties(JobConf jobConf, String tableName, Double readRa
description.provisionedThroughput().readCapacityUnits().toString());
jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT,
description.provisionedThroughput().writeCapacityUnits().toString());
+ // Assume auto-scaling enabled for PROVISIONED tables
+ jobConf.set(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING,
+ DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
+ jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING,
+ DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
} else {
// If not specified at the table level, set a hard coded value of 40,000
jobConf.set(DynamoDBConstants.READ_THROUGHPUT,
diff --git a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java
index be5e6c05..e8ea01fa 100644
--- a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java
+++ b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java
@@ -100,6 +100,11 @@ private void setTableProperties(JobConf jobConf, String tableName, Double writeR
description.provisionedThroughput().readCapacityUnits().toString());
jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT,
description.provisionedThroughput().writeCapacityUnits().toString());
+ // Assume auto-scaling enabled for PROVISIONED tables
+ jobConf.set(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING,
+ DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
+ jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING,
+ DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
} else {
jobConf.set(DynamoDBConstants.READ_THROUGHPUT,
DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString());