Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set RCU/WCU for PROVISIONED tables while maintaining support for autoscaling #178

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ public interface DynamoDBConstants {
String THROUGHPUT_READ_PERCENT = "dynamodb.throughput.read.percent";
String READ_THROUGHPUT = "dynamodb.throughput.read";
String WRITE_THROUGHPUT = "dynamodb.throughput.write";
String READ_THROUGHPUT_AUTOSCALING = "dynamodb.throughput.read.autoscaling";
String WRITE_THROUGHPUT_AUTOSCALING = "dynamodb.throughput.write.autoscaling";
String AVG_ITEM_SIZE = "dynamodb.item.average.size";
String ITEM_COUNT = "dynamodb.item.count";
String TABLE_SIZE_BYTES = "dynamodb.table.size-bytes";
String MAX_MAP_TASKS = "dynamodb.max.map.tasks";
String DEFAULT_THROUGHPUT_PERCENTAGE = "0.5";
String DEFAULT_THROUGHPUT_AUTOSCALING = "true";
String BILLING_MODE_PROVISIONED = BillingMode.PROVISIONED.toString();

String DYNAMODB_MAX_ITEM_SIZE = "dynamodb.max.item.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,24 @@ public ReadIopsCalculator(JobClient jobClient, DynamoDBClient dynamoDBClient, St
}

public long calculateTargetIops() {
double configuredThroughput = Math.floor(Double.parseDouble(
jobConf.get(DynamoDBConstants.READ_THROUGHPUT,
String.valueOf(getThroughput()))) * throughputPercent);
long throughputPerTask = Math.max((long) (configuredThroughput / totalSegments
double configuredThroughput;
// Always fetch throughput from DDB if auto-scaling is enabled or not specified
if (Boolean.parseBoolean(jobConf.get(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING))
|| jobConf.get(DynamoDBConstants.READ_THROUGHPUT) == null) {
configuredThroughput = getThroughput();
} else {
configuredThroughput = Double.parseDouble(jobConf.get(DynamoDBConstants.READ_THROUGHPUT));
}
double calculatedThroughput = Math.floor(configuredThroughput * throughputPercent);

long throughputPerTask = Math.max((long) (calculatedThroughput / totalSegments
* localSegments), 1);

log.info("Throughput per task for table " + tableName + " : " + throughputPerTask);
return throughputPerTask;
}

private double getThroughput() {
protected double getThroughput() {
TableDescription tableDescription = dynamoDBClient.describeTable(tableName);
if (tableDescription.billingModeSummary() == null
|| tableDescription.billingModeSummary().billingMode() == BillingMode.PROVISIONED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,16 @@ public WriteIopsCalculator(JobClient jobClient, DynamoDBClient dynamoDBClient, S
}

public long calculateTargetIops() {
double configuredThroughput = Math.floor(Double.parseDouble(
jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT, String.valueOf(getThroughput())))
* throughputPercent);
long throughputPerTask = Math.max((long) (configuredThroughput / maxParallelTasks), 1);
double configuredThroughput;
// Always fetch throughput from DDB if auto-scaling is enabled
if (Boolean.parseBoolean(jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING))
|| jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT) == null) {
configuredThroughput = getThroughput();
} else {
configuredThroughput = Double.parseDouble(jobConf.get(DynamoDBConstants.WRITE_THROUGHPUT));
}
double calculatedThroughput = Math.floor(configuredThroughput * throughputPercent);
long throughputPerTask = Math.max((long) (calculatedThroughput / maxParallelTasks), 1);

log.info("Throughput per task for table " + tableName + " : " + throughputPerTask);
return throughputPerTask;
Expand All @@ -88,7 +94,7 @@ int calculateMaxMapTasks(int totalMapTasks) {
return totalMapTasks;
}

private double getThroughput() {
protected double getThroughput() {
TableDescription tableDescription = dynamoDBClient.describeTable(tableName);
if (tableDescription.billingModeSummary() == null
|| tableDescription.billingModeSummary().billingMode() == BillingMode.PROVISIONED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
package org.apache.hadoop.dynamodb.read;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.dynamodb.DynamoDBClient;
import org.apache.hadoop.dynamodb.DynamoDBConstants;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputDescription;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
Expand All @@ -43,31 +47,55 @@ public class ReadIopsCalculatorTest {

private ReadIopsCalculator readIopsCalculator;

@Before
public void setup() {
@Test
public void testCalculateTargetIops() {
JobConf jobConf = new JobConf();
readIopsCalculator = getReadIopsCalculator(jobConf);

long readIops = readIopsCalculator.calculateTargetIops();
long expectedReadIops = (long) (READ_CAPACITY_UNITS * THROUGHPUT_READ_PERCENT *
LOCAL_SEGMENTS / TOTAL_SEGMETNS);
assertEquals(expectedReadIops, readIops);
}

@Test
public void testCalculateIopsAutoscalingEnabled() {
JobConf jobConf = new JobConf();
jobConf.set(DynamoDBConstants.READ_THROUGHPUT, "500");
readIopsCalculator = getReadIopsCalculator(jobConf);

ReadIopsCalculator spyIopsCalculator = Mockito.spy(readIopsCalculator);
doReturn((double) 1000).when(spyIopsCalculator).getThroughput();
long readIops = spyIopsCalculator.calculateTargetIops();
long expectedReadIops = (long) (500 * THROUGHPUT_READ_PERCENT *
LOCAL_SEGMENTS / TOTAL_SEGMETNS);
assertEquals(expectedReadIops, readIops);
// Autoscaling not enabled, throughput shouldn't be fetched when user specified
verify(spyIopsCalculator, times(0)).getThroughput();

jobConf.set(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING, "true");
long readIops2 = spyIopsCalculator.calculateTargetIops();
long expectedReadIops2 = (long) (1000 * THROUGHPUT_READ_PERCENT *
LOCAL_SEGMENTS / TOTAL_SEGMETNS);
assertEquals(expectedReadIops2, readIops2);
// Autoscaling enabled, throughput should be fetched regardless of if user specified
verify(spyIopsCalculator, times(1)).getThroughput();
}

private ReadIopsCalculator getReadIopsCalculator(JobConf jobConf) {
when(dynamoDBClient.describeTable(TABLE_NAME)).thenReturn(TableDescription.builder()
.billingModeSummary(BillingModeSummary.builder()
.billingMode(DynamoDBConstants.BILLING_MODE_PROVISIONED)
.billingMode(BillingMode.PROVISIONED)
.build())
.provisionedThroughput(ProvisionedThroughputDescription.builder()
.readCapacityUnits(READ_CAPACITY_UNITS)
.build())
.build());

JobConf jobConf = new JobConf();
jobConf.set(DynamoDBConstants.THROUGHPUT_READ_PERCENT, String.valueOf(THROUGHPUT_READ_PERCENT));
when(jobClient.getConf()).thenReturn(jobConf);
doReturn(jobConf).when(jobClient).getConf();

readIopsCalculator = new ReadIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME,
return new ReadIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME,
TOTAL_SEGMETNS, LOCAL_SEGMENTS);
}

@Test
public void testCalculateTargetIops() {
long readIops = readIopsCalculator.calculateTargetIops();
long expectedReadIops = (long) (READ_CAPACITY_UNITS * THROUGHPUT_READ_PERCENT *
LOCAL_SEGMENTS / TOTAL_SEGMETNS);
assertEquals(expectedReadIops, readIops);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@
package org.apache.hadoop.dynamodb.write;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.hadoop.dynamodb.DynamoDBClient;
import org.apache.hadoop.dynamodb.DynamoDBConstants;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.BillingModeSummary;
Expand All @@ -45,39 +48,62 @@ public class WriteIopsCalculatorTest {

private WriteIopsCalculator writeIopsCalculator;

@Before
public void setup() {
@Test
public void testCalculateTargetIops() {
JobConf jobConf = new JobConf();
writeIopsCalculator = getWriteIopsCalculator(jobConf);

long writeIops = writeIopsCalculator.calculateTargetIops();
long expectedWriteIops = (long) (WRITE_CAPACITY_UNITS * THROUGHPUT_WRITE_PERCENT / Math.min
(MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
assertEquals(expectedWriteIops, writeIops);
}

@Test
public void testCalculateIopsAutoscalingEnabled() {
JobConf jobConf = new JobConf();
jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, "500");
writeIopsCalculator = getWriteIopsCalculator(jobConf);

WriteIopsCalculator spyIopsCalculator = Mockito.spy(writeIopsCalculator);
doReturn((double) 1000).when(spyIopsCalculator).getThroughput();
long writeIops = spyIopsCalculator.calculateTargetIops();
long expectedWriteIops = (long) (500 * THROUGHPUT_WRITE_PERCENT / Math.min
(MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
assertEquals(expectedWriteIops, writeIops);
// Autoscaling not enabled, throughput shouldn't be fetched when user specified
verify(spyIopsCalculator, times(0)).getThroughput();

jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING, "true");
long writeIops2 = spyIopsCalculator.calculateTargetIops();
long expectedWriteIops2 = (long) (1000 * THROUGHPUT_WRITE_PERCENT / Math.min
(MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
assertEquals(expectedWriteIops2, writeIops2);
// Autoscaling enabled, throughput should be fetched regardless of if user specified
verify(spyIopsCalculator, times(1)).getThroughput();
}

private WriteIopsCalculator getWriteIopsCalculator(JobConf jobConf) {
when(dynamoDBClient.describeTable(TABLE_NAME)).thenReturn(TableDescription.builder()
.billingModeSummary(
BillingModeSummary.builder()
.billingModeSummary(BillingModeSummary.builder()
.billingMode(BillingMode.PROVISIONED)
.build())
.provisionedThroughput(
ProvisionedThroughputDescription.builder()
.writeCapacityUnits(WRITE_CAPACITY_UNITS)
.build())
.provisionedThroughput(ProvisionedThroughputDescription.builder()
.writeCapacityUnits(WRITE_CAPACITY_UNITS)
.build())
.build());

JobConf jobConf = new JobConf();
jobConf.setNumMapTasks(TOTAL_MAP_TASKS);
jobConf.set("mapreduce.task.attempt.id", "attempt_m_1");
jobConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, String.valueOf
(THROUGHPUT_WRITE_PERCENT));
jobConf.set(DynamoDBConstants.THROUGHPUT_WRITE_PERCENT,
String.valueOf(THROUGHPUT_WRITE_PERCENT));
when(jobClient.getConf()).thenReturn(jobConf);

writeIopsCalculator = new WriteIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME) {
return new WriteIopsCalculator(jobClient, dynamoDBClient, TABLE_NAME) {
@Override
int calculateMaxMapTasks(int totalMapTasks) {
return MAX_CONCURRENT_MAP_TASKS;
}
};
}

@Test
public void testCalculateTargetIops() {
long writeIops = writeIopsCalculator.calculateTargetIops();
long expectedWriteIops = (long) (WRITE_CAPACITY_UNITS * THROUGHPUT_WRITE_PERCENT / Math.min
(MAX_CONCURRENT_MAP_TASKS, TOTAL_MAP_TASKS));
assertEquals(expectedWriteIops, writeIops);
}
}
6 changes: 6 additions & 0 deletions emr-dynamodb-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese

@Override
public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
DynamoDBClient client =
new DynamoDBClient(conf, tableDesc.getProperties().getProperty(DynamoDBConstants.REGION));
DynamoDBClient client = createDynamoDBClient(tableDesc);

try {
String tableName = HiveDynamoDBUtil.getDynamoDBTableName(tableDesc.getProperties()
Expand Down Expand Up @@ -191,7 +190,22 @@ public void configureTableJobProperties(TableDesc tableDesc, Map<String, String>

if (description.billingModeSummary() == null
|| description.billingModeSummary().billingMode() == BillingMode.PROVISIONED) {
useExplicitThroughputIfRequired(jobProperties, tableDesc);
// If not specified at the table level, get initial read/write capacity from DDB
jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties()
.getProperty(DynamoDBConstants.READ_THROUGHPUT,
description.provisionedThroughput().readCapacityUnits().toString()));
jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, tableDesc.getProperties()
.getProperty(DynamoDBConstants.WRITE_THROUGHPUT,
description.provisionedThroughput().writeCapacityUnits().toString()));
// Assume auto-scaling enabled for PROVISIONED tables if throughput not specified by user
if (tableDesc.getProperties().getProperty(DynamoDBConstants.READ_THROUGHPUT) == null) {
jobProperties.put(DynamoDBConstants.READ_THROUGHPUT_AUTOSCALING,
DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
}
if (tableDesc.getProperties().getProperty(DynamoDBConstants.WRITE_THROUGHPUT) == null) {
jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT_AUTOSCALING,
DynamoDBConstants.DEFAULT_THROUGHPUT_AUTOSCALING);
}
} else {
// If not specified at the table level, set default value
jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, tableDesc.getProperties()
Expand Down Expand Up @@ -222,21 +236,6 @@ public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String
throw new AbstractMethodError("configureInputJobCredentials not supported");
}

private void useExplicitThroughputIfRequired(Map<String, String> jobProperties,
TableDesc tableDesc) {
String userRequiredReadThroughput =
tableDesc.getProperties().getProperty(DynamoDBConstants.READ_THROUGHPUT);
if (userRequiredReadThroughput != null) {
jobProperties.put(DynamoDBConstants.READ_THROUGHPUT, userRequiredReadThroughput);
}

String userRequiredWriteThroughput =
tableDesc.getProperties().getProperty(DynamoDBConstants.WRITE_THROUGHPUT);
if (userRequiredWriteThroughput != null) {
jobProperties.put(DynamoDBConstants.WRITE_THROUGHPUT, userRequiredWriteThroughput);
}
}

@Override
public Class<? extends InputFormat<Text, DynamoDBItemWritable>> getInputFormatClass() {
return HiveDynamoDBInputFormat.class;
Expand Down Expand Up @@ -375,11 +374,16 @@ void checkTableSchemaType(TableDescription tableDescription, Table table) throws
}
}

private DynamoDBClient createDynamoDBClient(Table table) {
protected DynamoDBClient createDynamoDBClient(Table table) {
String region = table.getParameters().get(DynamoDBConstants.REGION);
return new DynamoDBClient(conf, region);
}

protected DynamoDBClient createDynamoDBClient(TableDesc tableDesc) {
String region = tableDesc.getProperties().getProperty(DynamoDBConstants.REGION);
return new DynamoDBClient(conf, region);
}

private void checkTableStatus(TableDescription tableDescription) throws MetaException {
String status = tableDescription.tableStatusAsString();

Expand Down
Loading