Skip to content

Commit

Permalink
HIVE-28165 HiveSplitGenerator: send splits through filesystem instead…
Browse files Browse the repository at this point in the history
… of RPC in case of big payload
  • Loading branch information
abstractdog committed Apr 1, 2024
1 parent 0bc624a commit 3d44e09
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 6 deletions.
4 changes: 4 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -4756,6 +4756,10 @@ public static enum ConfVars {
"Class to use for calculating available slots during split generation"),
HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true,
"Whether to generate consistent split locations when generating splits in the AM"),
HIVE_TEZ_SPLIT_FS_SERIALIZATION_THRESHOLD("hive.tez.split.fs.serialization.threshold", 524288,
"Splits (as InputDataInformationEvent objects) larger than this (in bytes) will be serialized" +
" to tez scratchdir instead of being sent as RPC payloads directly. Default is 512 KB."
+ "-1 disables this feature."),
HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez(Hadoop 2 only)"),
HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez (Hadoop 2 only)"),
HIVE_STAGE_ID_REARRANGE("hive.stageid.rearrange", "none", new StringSet("none", "idonly", "traverse", "execution"), ""),
Expand Down
142 changes: 137 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -56,6 +64,8 @@
import org.apache.hadoop.mapred.split.SplitLocationProvider;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexLocationHint;
Expand All @@ -75,6 +85,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* This class is used to generate splits inside the AM on the cluster. It
Expand Down Expand Up @@ -154,6 +165,88 @@ private void prepare(InputInitializerContext initializerContext) throws IOExcept
LOG.info("SplitLocationProvider: " + splitLocationProvider);
}

/**
* SplitSerializer is a helper class for taking care of serializing splits to the tez scratch dir
* when a size criteria defined by "hive.tez.split.fs.serialization.threshold" is met.
* It utilizes an ExecutorService for parallel writes to prevent a single split write operation
* becoming the bottleneck (as write() is called from a loop currently).
*/
class SplitSerializer {
// fields needed for filepath
private String queryId;
private String inputName;
private Path appStagingPath;
// metrics
private AtomicInteger timeSpentWithSplitWriteMs = new AtomicInteger(0);
private AtomicInteger splitsWritten = new AtomicInteger(0);
// lazy initialized filesystem and executor
private FileSystem fs;
private ExecutorService executor;

/**
* Lazy init filesystem and executor service: don't initialize if there is no split serialized at all.
* No need to synchronize, this is called from a loop.
*/
private void lazyInit() throws IOException {
if (fs != null) {
return;
}
queryId = jobConf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
inputName = getContext().getInputName();
appStagingPath = TezCommonUtils.getTezSystemStagingPath(conf, getContext().getApplicationId().toString());

fs = appStagingPath.getFileSystem(jobConf);
executor = Executors.newFixedThreadPool(8,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("HiveSplitGenerator.SplitSerializer Thread - " + "#%d").build());
}

private InputDataInformationEvent write(int count, MRSplitProto mrSplit) throws IOException {
lazyInit();

InputDataInformationEvent diEvent;
Path filePath = getSerializedFilePath(appStagingPath, queryId, inputName, count);

// parallel writes for better performance (this is called from a loop)
executor.submit(() -> {
try {
long now = Time.monotonicNow();
try (FSDataOutputStream out = fs.create(filePath, false)) {
mrSplit.writeTo(out);
out.close();
}
splitsWritten.getAndIncrement();
long elapsed = Time.monotonicNow() - now;
timeSpentWithSplitWriteMs.getAndAdd((int) elapsed);
LOG.debug("Split #{} event to output path: {} written in {} ms", count, filePath, elapsed);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
// return the event now, we'll wait for the actual file write in close() if needed
return InputDataInformationEvent.createWithSerializedPath(count, filePath.toString());
}

private Path getSerializedFilePath(Path appStagingPath, String queryId, String inputName, int index) {
// e.g. staging_dir/events/queryid/inputtable_InputDataInformationEvent_0
return new Path(
String.format("%s/events/%s/%s_InputDataInformationEvent_%d", appStagingPath, queryId, inputName, index));
}

private void close() throws IOException {
if (fs != null) {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for the split files to be written", e);
}
LOG.info("Time spent with writing {} splits to fs: {} ms", splitsWritten, timeSpentWithSplitWriteMs);
fs.close();
}
}
}

@SuppressWarnings("unchecked")
@Override
public List<Event> initialize() throws Exception {
Expand Down Expand Up @@ -351,7 +444,9 @@ private InputSplit[] pruneBuckets(MapWork work, InputSplit[] splits) {
return splits;
}

private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
@VisibleForTesting
List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo)
throws IOException {

List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);

Expand All @@ -362,13 +457,42 @@ private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfo
events.add(configureVertexEvent);

if (sendSerializedEvents) {
MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
int count = 0;
for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
count++, mrSplit.toByteString().asReadOnlyByteBuffer());
long inMemoryPayloadSize = 0;
long serializedPayloadSize = 0;

int payloadSerializationThresholdBytes =
HiveConf.getIntVar(jobConf, HiveConf.ConfVars.HIVE_TEZ_SPLIT_FS_SERIALIZATION_THRESHOLD);
SplitSerializer splitSerializer = new SplitSerializer();

List<MRSplitProto> splits = inputSplitInfo.getSplitsProto().getSplitsList();

LOG.info("Start creating events for {} splits", splits.size());

for (MRSplitProto mrSplit : splits) {
ByteBuffer payloadBuffer = mrSplit.toByteString().asReadOnlyByteBuffer();
int payloadSize = payloadBuffer.limit();
boolean shouldSerializeEventToFile =
payloadSerializationThresholdBytes != -1 && payloadSize > payloadSerializationThresholdBytes;
LOG.debug("Split #{} ByteBuffer size: {} bytes, serialize to file: {} (threshold: {} bytes)", count, payloadSize,
shouldSerializeEventToFile, payloadSerializationThresholdBytes);

InputDataInformationEvent diEvent = null;

if (shouldSerializeEventToFile) {
serializedPayloadSize += payloadSize;
diEvent = splitSerializer.write(count, mrSplit);
} else {
inMemoryPayloadSize += payloadSize;
diEvent = InputDataInformationEvent.createWithSerializedPayload(count, payloadBuffer);
}
events.add(diEvent);
count += 1;
}
splitSerializer.close();
// this is useful for making decisions regarding split serialization threshold
LOG.info("Finished creating events ({} splits), size of payloads: in memory: {} bytes, serialized: {} bytes",
splits.size(), inMemoryPayloadSize, serializedPayloadSize);
} else {
int count = 0;
for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
Expand Down Expand Up @@ -444,4 +568,12 @@ private AvailableSlotsCalculator getAvailableSlotsCalculator() throws Exception
slotsCalculator.initialize(conf, this);
return slotsCalculator;
}

/**
* Convenience method for callers that want to disable the serialization of splits to filesystem.
*/
public HiveSplitGenerator splitFsSerialization(boolean splitFsSerialization) {
HiveConf.setIntVar(jobConf, HiveConf.ConfVars.HIVE_TEZ_SPLIT_FS_SERIALIZATION_THRESHOLD, -1);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat
Preconditions.checkState(HiveConf.getBoolVar(wxConf,
ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));

HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, false, inputArgNumSplits);
HiveSplitGenerator splitGenerator =
new HiveSplitGenerator(wxConf, mapWork, false, inputArgNumSplits).splitFsSerialization(false);
List<Event> eventList = splitGenerator.initialize();
int numGroupedSplitsGenerated = eventList.size() - 1;
InputSplit[] result = new InputSplit[numGroupedSplitsGenerated];
Expand Down
Loading

0 comments on commit 3d44e09

Please sign in to comment.