Skip to content

Commit

Permalink
HIVE-27494: Deduplicate the task result that generated by more branch…
Browse files Browse the repository at this point in the history
…es in union all
  • Loading branch information
dengzhhu653 committed Jul 12, 2023
1 parent 8fcef2c commit ac00fbf
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 18 deletions.
14 changes: 12 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,17 @@ private void initializeSpecPath() {
unionPath = null;
} else if (conf.isMmTable() || isUnionDp) {
// MM tables need custom handling for union suffix; DP tables use parent too.
specPath = conf.getParentDir();
// For !isDirectInsert and !conf.isMmTable() cases, the output will be like:
// w1: <table-dir>/<staging-dir>/_tmp.-ext-10000/<partition-dir>/HIVE_UNION_SUBDIR_1/<task_id>
// w2: <table-dir>/<staging-dir>/_tmp.-ext-10000/<partition-dir>/HIVE_UNION_SUBDIR_2/<task_id>
// When the BaseWork w2 in a TezTask closes first, it may rename the entire directory:
// <table-dir>/<staging-dir>/_tmp.-ext-10000 to <table-dir>/<staging-dir>/_tmp.-ext-10000.moved,
// make the specPath to conf.getDirName() can give w1 a chance to deal with his output under the
// directory HIVE_UNION_SUBDIR_1, the output directory after will be
// <table-dir>/<staging-dir>/-ext-10000/_tmp.HIVE_UNION_SUBDIR_1/<partition-dir>/HIVE_UNION_SUBDIR_1.
// When the job finishes, it will move the output to
// <table-dir>/<staging-dir>/-ext-10000/<partition-dir>/HIVE_UNION_SUBDIR_1 as it does before.
specPath = conf.isMmTable() ? conf.getParentDir() : conf.getDirName();
unionPath = conf.getDirName().getName();
} else {
// For now, keep the old logic for non-MM non-DP union case. Should probably be unified.
Expand Down Expand Up @@ -1585,7 +1595,7 @@ public void jobCloseOp(Configuration hconf, boolean success)
DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
ListBucketingCtx lbCtx = conf.getLbCtx();
if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable())) {
specPath = conf.getParentDir();
specPath = conf.isMmTable() ? conf.getParentDir() : conf.getDirName();
unionSuffix = conf.getDirName().getName();
}
if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
Expand Down
3 changes: 3 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,9 @@ public static void mvFileToFinalPath(Path specPath, String unionSuffix, Configur
FileSystem fs = specPath.getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
if (!StringUtils.isEmpty(unionSuffix)) {
specPath = specPath.getParent();
}
PerfLogger perfLogger = SessionState.getPerfLogger();
boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
boolean avoidRename = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.exec;

import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -78,7 +79,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -172,8 +172,9 @@ public void testNonAcidRemoveDuplicate() throws Exception {
setupData(DataFormat.WITH_PARTITION_VALUE);

FileSinkDesc desc = (FileSinkDesc) getFileSink(AcidUtils.Operation.NOT_ACID, true, 0).getConf().clone();
Path linkedDir = desc.getDirName();
desc.setLinkedFileSink(true);
desc.setDirName(new Path(desc.getDirName(), AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
desc.setDirName(new Path(linkedDir, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
JobConf jobConf = new JobConf(jc);
jobConf.set("hive.execution.engine", "tez");
jobConf.set("mapred.task.id", "000000_0");
Expand All @@ -188,41 +189,85 @@ public void testNonAcidRemoveDuplicate() throws Exception {
op2.setConf(desc);
op2.initialize(jobConf2, new ObjectInspector[]{inspector});

// Another sub-query in union
JobConf jobConf3 = new JobConf(jobConf);
jobConf3.set("mapred.task.id", "000001_0");
FileSinkOperator op3 = (FileSinkOperator)OperatorFactory.get(
new CompilationOpContext(), FileSinkDesc.class);
FileSinkDesc sinkDesc = (FileSinkDesc) desc.clone();
sinkDesc.setDirName(new Path(linkedDir, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1"));
op3.setConf(sinkDesc);
op3.initialize(jobConf3, new ObjectInspector[]{inspector});

JobConf jobConf4 = new JobConf(jobConf);
jobConf4.set("mapred.task.id", "000001_1");
FileSinkOperator op4 = (FileSinkOperator)OperatorFactory.get(
new CompilationOpContext(), FileSinkDesc.class);
op4.setConf(sinkDesc);
op4.initialize(jobConf4, new ObjectInspector[]{inspector});

for (Object r : rows) {
op1.process(r, 0);
op2.process(r, 0);
op3.process(r, 0);
op4.process(r, 0);
}

op1.close(false);
// Assume op2 also ends successfully, this happens in different containers
op2.close(false);
Path[] paths = findFilesInBasePath();
List<Path> mondays = Arrays.stream(paths)
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
.collect(Collectors.toList());
Assert.assertEquals("Two result files are expected", 2, mondays.size());
Set<String> fileNames = new HashSet<>();
fileNames.add(mondays.get(0).getName());
fileNames.add(mondays.get(1).getName());
op3.close(false);
op4.close(false);

Path[] paths = findFilesInPath(linkedDir);
// = findFilesInBasePath() # use findFilesInBasePath before the fix
Set<String> fileNames = Arrays.stream(paths)
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
.map(path -> path.getName())
.collect(Collectors.toSet());
Assert.assertEquals("Two result files are expected", 2, fileNames.size());
Assert.assertTrue("000000_1 file is expected", fileNames.contains("000000_1"));
Assert.assertTrue("000000_0 file is expected", fileNames.contains("000000_0"));

fileNames = Arrays.stream(paths)
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_1"))
.map(path -> path.getName())
.collect(Collectors.toSet());
Assert.assertEquals("Two result files are expected", 2, fileNames.size());
Assert.assertTrue("000001_0 file is expected", fileNames.contains("000001_0"));
Assert.assertTrue("000001_1 file is expected", fileNames.contains("000001_1"));

// Close op3 first to see if it can deduplicate the result under HIVE_UNION_SUBDIR_0
op3.jobCloseOp(jobConf, true);
// This happens in HiveServer2 when the job is finished, the job will call
// jobCloseOp to end his operators. For the FileSinkOperator, a deduplication on the
// output files may happen so that only one output file is left for each yarn task.
op1.jobCloseOp(jobConf, true);
List<Path> resultFiles = new ArrayList<Path>();
recurseOnPath(basePath, basePath.getFileSystem(jc), resultFiles);
mondays = resultFiles.stream()
.filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
String linkedDirPath = linkedDir.toUri().getPath();
recurseOnPath(linkedDir, linkedDir.getFileSystem(jc), resultFiles);
List<Path> mondays = resultFiles.stream()
.filter(path -> path.getParent().toUri().getPath()
.equals(linkedDirPath + "/partval=Monday/HIVE_UNION_SUBDIR_0"))
.collect(Collectors.toList());
Assert.assertEquals("Only 1 file should be here after cleaning", 1, mondays.size());
Assert.assertEquals("000000_1 file is expected", "000000_1", mondays.get(0).getName());

confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.toArray(new Path[0]));
// Clean out directory after testing
basePath.getFileSystem(jc).delete(basePath, true);
List<Path> subdir1 = resultFiles.stream()
.filter(path -> path.getParent().getName().equals("HIVE_UNION_SUBDIR_1")).sorted()
.collect(Collectors.toList());
Assert.assertEquals("Two partitions expected", 2, subdir1.size());
Path monday = subdir1.get(0), tuesday = subdir1.get(1);
Assert.assertEquals("Only 1 file left under the partition after deduplication", monday.toUri().getPath(),
linkedDirPath + "/partval=Monday/HIVE_UNION_SUBDIR_1/000001_1");
Assert.assertEquals("Only 1 file left under the partition after deduplication", tuesday.toUri().getPath(),
linkedDirPath + "/partval=Tuesday/HIVE_UNION_SUBDIR_1/000001_1");

// Confirm the output
confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.stream()
.filter(p -> p.getParent().getName().equals("HIVE_UNION_SUBDIR_0")).sorted()
.collect(Collectors.toList()).toArray(new Path[0]));
confirmOutput(DataFormat.WITH_PARTITION_VALUE, subdir1.toArray(new Path[0]));
}

@Test
Expand Down Expand Up @@ -270,6 +315,16 @@ public void setup() throws Exception {
jc.set(HiveConf.ConfVars.HIVESTATSDBCLASS.varname, "custom");
}

@After
public void afterTest() throws Exception {
Path parent = basePath.getParent();
String last = basePath.getName();
FileSystem fs = basePath.getFileSystem(jc);
fs.delete(basePath, true);
fs.delete(new Path(parent, "_tmp." + last), true);
fs.delete(new Path(parent, "_task_tmp." + last), true);
}

private void setBasePath(String testName) {
basePath = new Path(new File(tmpdir, testName).getPath());

Expand Down Expand Up @@ -414,6 +469,13 @@ private Path[] findFilesInBasePath() throws IOException {
return paths.toArray(new Path[paths.size()]);
}

private Path[] findFilesInPath(Path path) throws IOException {
FileSystem fs = path.getFileSystem(jc);
List<Path> paths = new ArrayList<Path>();
recurseOnPath(path, fs, paths);
return paths.toArray(new Path[paths.size()]);
}

private void recurseOnPath(Path p, FileSystem fs, List<Path> paths) throws IOException {
if (fs.getFileStatus(p).isDir()) {
FileStatus[] stats = fs.listStatus(p);
Expand Down

0 comments on commit ac00fbf

Please sign in to comment.