diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java index 4d951913..80bc0f47 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java @@ -60,11 +60,14 @@ public interface DynamoDBConstants { String THROUGHPUT_READ_PERCENT = "dynamodb.throughput.read.percent"; String READ_THROUGHPUT = "dynamodb.throughput.read"; String WRITE_THROUGHPUT = "dynamodb.throughput.write"; + String READ_THROUGHPUT_AUTOSCALING = "dynamodb.throughput.read.autoscaling"; + String WRITE_THROUGHPUT_AUTOSCALING = "dynamodb.throughput.write.autoscaling"; String AVG_ITEM_SIZE = "dynamodb.item.average.size"; String ITEM_COUNT = "dynamodb.item.count"; String TABLE_SIZE_BYTES = "dynamodb.table.size-bytes"; String MAX_MAP_TASKS = "dynamodb.max.map.tasks"; String DEFAULT_THROUGHPUT_PERCENTAGE = "0.5"; + String DEFAULT_THROUGHPUT_AUTOSCALING = "true"; String BILLING_MODE_PROVISIONED = BillingMode.PROVISIONED.toString(); String DYNAMODB_MAX_ITEM_SIZE = "dynamodb.max.item.size"; diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java index 7634d965..95f83b78 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculator.java @@ -55,17 +55,24 @@ public ReadIopsCalculator(JobClient jobClient, DynamoDBClient dynamoDBClient, St } public long calculateTargetIops() { - double configuredThroughput = Math.floor(Double.parseDouble( - jobConf.get(DynamoDBConstants.READ_THROUGHPUT, - String.valueOf(getThroughput()))) * throughputPercent); - long throughputPerTask = Math.max((long) (configuredThroughput / totalSegments + double configuredThroughput; + // Always fetch throughput from DDB if auto-scaling is enabled or not specified + if (Boolean.parseBoolean(jobConf.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING)) + || jobConf.get(DynamoDBConstants.READ_THROUGHPUT) == null) { + configuredThroughput = getThroughput(); + } else { + configuredThroughput = Double.parseDouble(jobConf.get(DynamoDBConstants.READ_THROUGHPUT)); + } + double calculatedThroughput = Math.floor(configuredThroughput * throughputPercent); + + long throughputPerTask = Math.max((long) (calculatedThroughput / totalSegments * localSegments), 1); log.info("Throughput per task for table " + tableName + " : " + throughputPerTask); return throughputPerTask; } - private double getThroughput() { + protected double getThroughput() { TableDescription tableDescription = dynamoDBClient.describeTable(tableName); if (tableDescription.billingModeSummary() == null || tableDescription.billingModeSummary().billingMode() == BillingMode.PROVISIONED) { diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java index df415769..d32685db 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java @@ -70,10 +70,16 @@ public WriteIopsCalculator(JobClient jobClient, DynamoDBClient dynamoDBClient, S } public long calculateTargetIops() { - double configuredThroughput = Math.floor(Double.parseDouble( - jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT, String.valueOf(getThroughput()))) - * throughputPercent); - long throughputPerTask = Math.max((long) (configuredThroughput / maxParallelTasks), 1); + double configuredThroughput; + // Always fetch throughput from DDB if auto-scaling is enabled + if (Boolean.parseBoolean(jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING)) + || jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT) == null) { + configuredThroughput = getThroughput(); + } else { + configuredThroughput = Double.parseDouble(jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT)); + } + double calculatedThroughput = Math.floor(configuredThroughput * throughputPercent); + long throughputPerTask = Math.max((long) (calculatedThroughput / maxParallelTasks), 1); log.info("Throughput per task for table " + tableName + " : " + throughputPerTask); return throughputPerTask; @@ -88,7 +94,7 @@ int calculateMaxMapTasks(int totalMapTasks) { return totalMapTasks; } - private double getThroughput() { + protected double getThroughput() { TableDescription tableDescription = dynamoDBClient.describeTable(tableName); if (tableDescription.billingModeSummary() == null || tableDescription.billingModeSummary().billingMode() == BillingMode.PROVISIONED) { diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java index 0c6c035e..790bea9d 100644 --- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java +++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/read/ReadIopsCalculatorTest.java @@ -14,16 +14,20 @@ package org.apache.hadoop.dynamodb.read; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.hadoop.dynamodb.DynamoDBClient; import org.apache.hadoop.dynamodb.DynamoDBConstants; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription; import software.amazon.awssdk.services.dynamodb.model.TableDescription; @@ -43,31 +47,55 @@ public class ReadIopsCalculatorTest { private ReadIopsCalculator readIopsCalculator; - @Before - public void setup() { + @Test + public void testCalculateTargetIops() { + JobConf jobConf = new JobConf(); + readIopsCalculator = getReadIopsCalculator(jobConf); + + long readIops = readIopsCalculator.calculateTargetIops(); + long expectedReadIops = (long) (READ_CAPACITY_UNITS * THROUGHPUT_READ_PERCENT * + LOCAL_SEGMENTS / TOTAL_SEGMETNS); + assertEquals(expectedReadIops, readIops); + } + + @Test + public void testCalculateIopsAutoscalingEnabled() { + JobConf jobConf = new JobConf(); + jobConf.set(DynamoDBConstants.READ_THROUGHPUT, "500"); + readIopsCalculator = getReadIopsCalculator(jobConf); + + ReadIopsCalculator spyIopsCalculator = Mockito.spy(readIopsCalculator); + doReturn((double) 1000).when(spyIopsCalculator).getThroughput(); + long readIops = spyIopsCalculator.calculateTargetIops(); + long expectedReadIops = (long) (500 * THROUGHPUT_READ_PERCENT * + LOCAL_SEGMENTS / TOTAL_SEGMETNS); + assertEquals(expectedReadIops, readIops); + // Autoscaling not enabled, throughput shouldn't be fetched when user specified + verify(spyIopsCalculator, times(0)).getThroughput(); + + jobConf.set(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING, "true"); + long readIops2 = spyIopsCalculator.calculateTargetIops(); + long expectedReadIops2 = (long) (1000 * THROUGHPUT_READ_PERCENT * + LOCAL_SEGMENTS / TOTAL_SEGMETNS); + assertEquals(expectedReadIops2, readIops2); + // Autoscaling enabled, throughput should be fetched regardless of if user specified + verify(spyIopsCalculator, times(1)).getThroughput(); + } + + private ReadIopsCalculator getReadIopsCalculator(JobConf jobConf) { when(dynamoDBClient.describeTable(TABLE_NAME)).thenReturn(TableDescription.builder() .billingModeSummary(BillingModeSummary.builder() - .billingMode(DynamoDBConstants.BILLING_MODE_PROVISIONED) + .billingMode(BillingMode.PROVISIONED) .build()) .provisionedThroughput(ProvisionedThroughputDescription.builder() .readCapacityUnits(READ_CAPACITY_UNITS) .build()) .build()); - JobConf jobConf = new JobConf(); jobConf.set(DynamoDBConstants.THROUGHPUT_READ_PERCENT, String.valueOf(THROUGHPUT_READ_PERCENT)); - when(jobClient.getConf()).thenReturn(jobConf); + doReturn(jobConf).when(jobClient).getConf(); - readIopsCalculator = new ReadIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME, + return new ReadIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME, TOTAL_SEGMETNS, LOCAL_SEGMENTS); } - - @Test - public void testCalculateTargetIops() { - long readIops = readIopsCalculator.calculateTargetIops(); - long expectedReadIops = (long) (READ_CAPACITY_UNITS * THROUGHPUT_READ_PERCENT * - LOCAL_SEGMENTS / TOTAL_SEGMETNS); - assertEquals(expectedReadIops, readIops); - } - } diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java index 0e7899dc..7f4e0c37 100644 --- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java +++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculatorTest.java @@ -14,16 +14,19 @@ package org.apache.hadoop.dynamodb.write; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.hadoop.dynamodb.DynamoDBClient; import org.apache.hadoop.dynamodb.DynamoDBConstants; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary; @@ -45,39 +48,62 @@ public class WriteIopsCalculatorTest { private WriteIopsCalculator writeIopsCalculator; - @Before - public void setup() { + @Test + public void testCalculateTargetIops() { + JobConf jobConf = new JobConf(); + writeIopsCalculator = getWriteIopsCalculator(jobConf); + + long writeIops = writeIopsCalculator.calculateTargetIops(); + long expectedWriteIops = (long) (WRITE_CAPACITY_UNITS * THROUGHPUT_WRITE_PERCENT / Math.min + (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS)); + assertEquals(expectedWriteIops, writeIops); + } + + @Test + public void testCalculateIopsAutoscalingEnabled() { + JobConf jobConf = new JobConf(); + jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, "500"); + writeIopsCalculator = getWriteIopsCalculator(jobConf); + + WriteIopsCalculator spyIopsCalculator = Mockito.spy(writeIopsCalculator); + doReturn((double) 1000).when(spyIopsCalculator).getThroughput(); + long writeIops = spyIopsCalculator.calculateTargetIops(); + long expectedWriteIops = (long) (500 * THROUGHPUT_WRITE_PERCENT / Math.min + (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS)); + assertEquals(expectedWriteIops, writeIops); + // Autoscaling not enabled, throughput shouldn't be fetched when user specified + verify(spyIopsCalculator, times(0)).getThroughput(); + + jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING, "true"); + long writeIops2 = spyIopsCalculator.calculateTargetIops(); + long expectedWriteIops2 = (long) (1000 * THROUGHPUT_WRITE_PERCENT / Math.min + (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS)); + assertEquals(expectedWriteIops2, writeIops2); + // Autoscaling enabled, throughput should be fetched regardless of if user specified + verify(spyIopsCalculator, times(1)).getThroughput(); + } + + private WriteIopsCalculator getWriteIopsCalculator(JobConf jobConf) { when(dynamoDBClient.describeTable(TABLE_NAME)).thenReturn(TableDescription.builder() - .billingModeSummary( - BillingModeSummary.builder() + .billingModeSummary(BillingModeSummary.builder() .billingMode(BillingMode.PROVISIONED) .build()) - .provisionedThroughput( - ProvisionedThroughputDescription.builder() - .writeCapacityUnits(WRITE_CAPACITY_UNITS) - .build()) + .provisionedThroughput(ProvisionedThroughputDescription.builder() + .writeCapacityUnits(WRITE_CAPACITY_UNITS) + .build()) .build()); - JobConf jobConf = new JobConf(); jobConf.setNumMapTasks(TOTAL_MAP_TASKS); jobConf.set("mapreduce.task.attempt.id", "attempt_m_1"); - jobConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, String.valueOf - (THROUGHPUT_WRITE_PERCENT)); + jobConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, + String.valueOf(THROUGHPUT_WRITE_PERCENT)); when(jobClient.getConf()).thenReturn(jobConf); - writeIopsCalculator = new WriteIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME) { + return new WriteIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME) { @Override int calculateMaxMapTasks(int totalMapTasks) { return MAX_CONCURRENT_MAP_TASKS; } }; } - - @Test - public void testCalculateTargetIops() { - long writeIops = writeIopsCalculator.calculateTargetIops(); - long expectedWriteIops = (long) (WRITE_CAPACITY_UNITS * THROUGHPUT_WRITE_PERCENT / Math.min - (MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS)); - assertEquals(expectedWriteIops, writeIops); - } } diff --git a/emr-dynamodb-hive/pom.xml b/emr-dynamodb-hive/pom.xml index 83240732..e74c75c1 100644 --- a/emr-dynamodb-hive/pom.xml +++ b/emr-dynamodb-hive/pom.xml @@ -112,6 +112,12 @@ test + + org.mockito + mockito-core + test + + diff --git a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java index fb295546..e780f03c 100644 --- a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java +++ b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandler.java @@ -119,8 +119,7 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese @Override public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) { - DynamoDBClient client = - new DynamoDBClient(conf, tableDesc.getProperties().getProperty(DynamoDBConstants.REGION)); + DynamoDBClient client = createDynamoDBClient(tableDesc); try { String tableName = HiveDynamoDBUtil.getDynamoDBTableName(tableDesc.getProperties() @@ -191,7 +190,22 @@ public void configureTableJobProperties(TableDesc tableDesc, Map if (description.billingModeSummary() == null || description.billingModeSummary().billingMode() == BillingMode.PROVISIONED) { - useExplicitThroughputIfRequired(jobProperties, tableDesc); + // If not specified at the table level, get initial read/write capacity from DDB + jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties() + .getProperty(DynamoDBConstants.READ_THROUGHPUT, + description.provisionedThroughput().readCapacityUnits().toString())); + jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, tableDesc.getProperties() + .getProperty(DynamoDBConstants.WRITE_THROUGHPUT, + description.provisionedThroughput().writeCapacityUnits().toString())); + // Assume auto-scaling enabled for PROVISIONED tables if throughput not specified by user + if (tableDesc.getProperties().getProperty(DynamoDBConstants.READ_THROUGHPUT) == null) { + jobProperties.put(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING, + DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING); + } + if (tableDesc.getProperties().getProperty(DynamoDBConstants.WRITE_THROUGHPUT) == null) { + jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING, + DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING); + } } else { // If not specified at the table level, set default value jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties() @@ -222,21 +236,6 @@ public void configureInputJobCredentials(TableDesc tableDesc, Map jobProperties, - TableDesc tableDesc) { - String userRequiredReadThroughput = - tableDesc.getProperties().getProperty(DynamoDBConstants.READ_THROUGHPUT); - if (userRequiredReadThroughput != null) { - jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, userRequiredReadThroughput); - } - - String userRequiredWriteThroughput = - tableDesc.getProperties().getProperty(DynamoDBConstants.WRITE_THROUGHPUT); - if (userRequiredWriteThroughput != null) { - jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, userRequiredWriteThroughput); - } - } - @Override public Class> getInputFormatClass() { return HiveDynamoDBInputFormat.class; @@ -375,11 +374,16 @@ void checkTableSchemaType(TableDescription tableDescription, Table table) throws } } - private DynamoDBClient createDynamoDBClient(Table table) { + protected DynamoDBClient createDynamoDBClient(Table table) { String region = table.getParameters().get(DynamoDBConstants.REGION); return new DynamoDBClient(conf, region); } + protected DynamoDBClient createDynamoDBClient(TableDesc tableDesc) { + String region = tableDesc.getProperties().getProperty(DynamoDBConstants.REGION); + return new DynamoDBClient(conf, region); + } + private void checkTableStatus(TableDescription tableDescription) throws MetaException { String status = tableDescription.tableStatusAsString(); diff --git a/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java b/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java index 4f50111b..8d650ab2 100644 --- a/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java +++ b/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/DynamoDBStorageHandlerTest.java @@ -13,11 +13,14 @@ package org.apache.hadoop.hive.dynamodb; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dynamodb.DynamoDBClient; import org.apache.hadoop.dynamodb.DynamoDBConstants; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -26,13 +29,26 @@ import com.google.common.collect.Maps; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; + +import org.mockito.Mockito; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.TableDescription; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; + public class DynamoDBStorageHandlerTest { @Rule @@ -42,6 +58,7 @@ public class DynamoDBStorageHandlerTest { @Before public void setup() { storageHandler = new DynamoDBStorageHandler(); + storageHandler.setConf(new Configuration()); } @Test @@ -309,7 +326,87 @@ public void testCheckTableSchemaNullSerializationValid() throws MetaException { storageHandler.checkTableSchemaType(description, table); } + @Test + public void testReadWriteThroughputConfiguredProvisionedTable() { + DynamoDBClient mockDynamoClient = Mockito.mock(DynamoDBClient.class); + TableDescription mockDescribeTableResponse = + getHashRangeTableByBillingMode(BillingMode.PROVISIONED); + doReturn(mockDescribeTableResponse).when(mockDynamoClient).describeTable(any()); + + DynamoDBStorageHandler spyDynamoStorageHandler = Mockito.spy(storageHandler); + doReturn(mockDynamoClient).when(spyDynamoStorageHandler).createDynamoDBClient((TableDesc) any()); + + TableDesc testTableDesc = new TableDesc(); + Properties properties = new Properties(); + properties.setProperty(DynamoDBConstants.REGION, "us-east-1"); + properties.setProperty(DynamoDBConstants.TABLE_NAME, "test-table"); + testTableDesc.setProperties(properties); + + // User has not specified explicit throughput settings, fetch from dynamo table + Map jobProperties1 = new HashMap<>(); + spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties1); + assertEquals(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT), "400"); + assertEquals(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT), "100"); + // Provisioned tables should be configured to dynamically calculate throughput during tasks + assertTrue(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING))); + assertTrue(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING))); + + // User has specified throughput settings, override what is in table + properties.setProperty(DynamoDBConstants.READ_THROUGHPUT, "50"); + properties.setProperty(DynamoDBConstants.WRITE_THROUGHPUT, "50"); + + Map jobProperties2 = new HashMap<>(); + spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties2); + assertEquals(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT), "50"); + assertEquals(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT), "50"); + // Provisioned tables with user specified settings should not dynamically configure throughput + assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING))); + assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING))); + } + + @Test + public void testReadWriteThroughputConfiguredOnDemandTable() { + DynamoDBClient mockDynamoClient = Mockito.mock(DynamoDBClient.class); + TableDescription mockDescribeTableResponse = + getHashRangeTableByBillingMode(BillingMode.PAY_PER_REQUEST); + doReturn(mockDescribeTableResponse).when(mockDynamoClient).describeTable(any()); + + DynamoDBStorageHandler spyDynamoStorageHandler = Mockito.spy(storageHandler); + doReturn(mockDynamoClient).when(spyDynamoStorageHandler).createDynamoDBClient((TableDesc) any()); + + TableDesc testTableDesc = new TableDesc(); + Properties properties = new Properties(); + properties.setProperty(DynamoDBConstants.REGION, "us-east-1"); + properties.setProperty(DynamoDBConstants.TABLE_NAME, "test-table"); + testTableDesc.setProperties(properties); + + // User has not specified explicit throughput settings, default for on-demand are set + Map jobProperties1 = new HashMap<>(); + spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties1); + assertEquals(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT), DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString()); + assertEquals(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT), DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString()); + // On demand tables should never dynamically configure throughput + assertFalse(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING))); + assertFalse(Boolean.parseBoolean(jobProperties1.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING))); + + // User has specified throughput settings + properties.setProperty(DynamoDBConstants.READ_THROUGHPUT, "50"); + properties.setProperty(DynamoDBConstants.WRITE_THROUGHPUT, "50"); + + Map jobProperties2 = new HashMap<>(); + spyDynamoStorageHandler.configureTableJobProperties(testTableDesc, jobProperties2); + assertEquals(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT), "50"); + assertEquals(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT), "50"); + assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING))); + assertFalse(Boolean.parseBoolean(jobProperties2.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING))); + } + + private TableDescription getHashRangeTable() { + return getHashRangeTableByBillingMode(BillingMode.PROVISIONED); + } + + private TableDescription getHashRangeTableByBillingMode(BillingMode billingMode) { TableDescription description = TableDescription.builder() .keySchema(Arrays.asList( KeySchemaElement.builder().attributeName("hashKey").build(), @@ -323,6 +420,15 @@ private TableDescription getHashRangeTable() { .attributeName("rangeKey") .attributeType(ScalarAttributeType.N) .build())) + .provisionedThroughput(ProvisionedThroughputDescription.builder() + .readCapacityUnits(400L) + .writeCapacityUnits(100L) + .build()) + .billingModeSummary(BillingModeSummary.builder() + .billingMode(billingMode) + .build()) + .itemCount(0L) + .tableSizeBytes(0L) .build(); return description; } diff --git a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java index 8058f090..c971cd61 100644 --- a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java +++ b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBExport.java @@ -117,6 +117,11 @@ private void setTableProperties(JobConf jobConf, String tableName, Double readRa description.provisionedThroughput().readCapacityUnits().toString()); jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, description.provisionedThroughput().writeCapacityUnits().toString()); + // Assume auto-scaling enabled for PROVISIONED tables + jobConf.set(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING, + DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING); + jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING, + DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING); } else { // If not specified at the table level, set a hard coded value of 40,000 jobConf.set(DynamoDBConstants.READ_THROUGHPUT, diff --git a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java index be5e6c05..e8ea01fa 100644 --- a/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java +++ b/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java @@ -100,6 +100,11 @@ private void setTableProperties(JobConf jobConf, String tableName, Double writeR description.provisionedThroughput().readCapacityUnits().toString()); jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, description.provisionedThroughput().writeCapacityUnits().toString()); + // Assume auto-scaling enabled for PROVISIONED tables + jobConf.set(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING, + DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING); + jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING, + DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING); } else { jobConf.set(DynamoDBConstants.READ_THROUGHPUT, DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND.toString());