Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "server/io/FS.proto";
message StoreFileEntry {
required string name = 1;
required uint64 size = 2;
optional Reference reference = 3;
}

message StoreFileList {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class HFileLink extends FileLink {
* The pattern should be used for hfile and reference links that can be found in
* /hbase/table/region/family/
*/
private static final Pattern REF_OR_HFILE_LINK_PATTERN =
public static final Pattern REF_OR_HFILE_LINK_PATTERN =
Pattern.compile(String.format("^(?:(%s)(?:=))?(%s)=(%s)-(.+)$", TableName.VALID_NAMESPACE_REGEX,
TableName.VALID_TABLE_QUALIFIER_REGEX, RegionInfoBuilder.ENCODED_REGION_NAME_REGEX));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class Reference {
* For split HStoreFiles, it specifies if the file covers the lower half or the upper half of the
* key range
*/
static enum Range {
public static enum Range {
/** HStoreFile contains upper half of key range */
top,
/** HStoreFile contains lower half of key range */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();
List<Path> mergedFiles = new ArrayList<>();
List<StoreFileInfo> mergedFiles = new ArrayList<StoreFileInfo>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem
.createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir, mergedRegion);

Expand All @@ -622,11 +622,11 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
.setState(State.MERGING_NEW);
}

private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
private List<StoreFileInfo> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
final TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(mergedRegion.getTable());
List<Path> mergedFiles = new ArrayList<>();
List<StoreFileInfo> mergedFiles = new ArrayList<StoreFileInfo>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
StoreFileTracker tracker =
Expand All @@ -643,7 +643,7 @@ private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem reg
// is running in a regionserver's Store context, or we might not be able
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
StoreFileInfo refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), tracker);
mergedFiles.add(refFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,20 +660,46 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
Pair<List<StoreFileInfo>, List<StoreFileInfo>> expectedReferences =
splitStoreFiles(env, regionFs);
final ExecutorService threadPool = Executors.newFixedThreadPool(2,
new ThreadFactoryBuilder().setNameFormat("RegionCommitter-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
threadPool.submit(new Callable<Path>() {
@Override
public Path call() throws IOException {
return regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
}
});
threadPool.submit(new Callable<Path>() {
@Override
public Path call() throws IOException {
return regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
}
});
// Shutdown the pool
threadPool.shutdown();

Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);

assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
regionFs.getSplitsDir(daughterOneRI));
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
new Path(tabledir, daughterOneRI.getEncodedName()));

assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
regionFs.getSplitsDir(daughterTwoRI));
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
new Path(tabledir, daughterTwoRI.getEncodedName()));
Configuration conf = env.getMasterConfiguration();
// Wait for all the tasks to finish.
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
// hbase.regionserver.fileSplitTimeout. If set, use its value.
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
try {
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
if (stillRunning) {
threadPool.shutdownNow();
// wait for the thread to shutdown completely.
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException(
"Took too long to split the" + " files and create the references, aborting split");
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}

private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOException {
Expand All @@ -689,8 +715,8 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
* Create Split directory
* @param env MasterProcedureEnv
*/
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
private Pair<List<StoreFileInfo>, List<StoreFileInfo>> splitStoreFiles(
final MasterProcedureEnv env, final HRegionFileSystem regionFs) throws IOException {
final Configuration conf = env.getMasterConfiguration();
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
// The following code sets up a thread pool executor with as many slots as
Expand Down Expand Up @@ -745,7 +771,8 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
final List<Future<Pair<StoreFileInfo, StoreFileInfo>>> futures =
new ArrayList<Future<Pair<StoreFileInfo, StoreFileInfo>>>(nbFiles);

// Split each store file.
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
Expand Down Expand Up @@ -792,12 +819,12 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

List<Path> daughterA = new ArrayList<>();
List<Path> daughterB = new ArrayList<>();
List<StoreFileInfo> daughterA = new ArrayList<>();
List<StoreFileInfo> daughterB = new ArrayList<>();
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
for (Future<Pair<StoreFileInfo, StoreFileInfo>> future : futures) {
try {
Pair<Path, Path> p = future.get();
Pair<StoreFileInfo, StoreFileInfo> p = future.get();
if (p.getFirst() != null) {
daughterA.add(p.getFirst());
}
Expand All @@ -819,6 +846,7 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
return new Pair<>(daughterA, daughterB);
}

// TODO: update assert to do SFT.load instead of FileSystem listing
private void assertSplitResultFilesCount(final FileSystem fs,
final int expectedSplitResultFileCount, Path dir) throws IOException {
if (expectedSplitResultFileCount != 0) {
Expand All @@ -830,8 +858,8 @@ private void assertSplitResultFilesCount(final FileSystem fs,
}
}

private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, TableDescriptor htd,
ColumnFamilyDescriptor hcd, HStoreFile sf) throws IOException {
private Pair<StoreFileInfo, StoreFileInfo> splitStoreFile(HRegionFileSystem regionFs,
TableDescriptor htd, ColumnFamilyDescriptor hcd, HStoreFile sf) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting started for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
Expand All @@ -847,22 +875,22 @@ private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, TableDescrip
StoreFileTrackerFactory.create(regionFs.getFileSystem().getConf(), htd, hcd,
HRegionFileSystem.create(regionFs.getFileSystem().getConf(), regionFs.getFileSystem(),
regionFs.getTableDir(), daughterTwoRI));
final Path path_first = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow,
false, splitPolicy, daughterOneSft);
final Path path_second = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow,
true, splitPolicy, daughterTwoSft);
final StoreFileInfo sfiFirst = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf,
splitRow, false, splitPolicy, daughterOneSft);
final StoreFileInfo sfiSecond = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf,
splitRow, true, splitPolicy, daughterTwoSft);
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting complete for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
}
return new Pair<Path, Path>(path_first, path_second);
return new Pair<StoreFileInfo, StoreFileInfo>(sfiFirst, sfiSecond);
}

/**
* Utility class used to do the file splitting / reference writing in parallel instead of
* sequentially.
*/
private class StoreFileSplitter implements Callable<Pair<Path, Path>> {
private class StoreFileSplitter implements Callable<Pair<StoreFileInfo, StoreFileInfo>> {
private final HRegionFileSystem regionFs;
private final ColumnFamilyDescriptor hcd;
private final HStoreFile sf;
Expand All @@ -883,7 +911,7 @@ public StoreFileSplitter(HRegionFileSystem regionFs, TableDescriptor htd,
}

@Override
public Pair<Path, Path> call() throws IOException {
public Pair<StoreFileInfo, StoreFileInfo> call() throws IOException {
return splitStoreFile(regionFs, htd, hcd, sf);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class HMobStore extends HStore {
// table, we need to find the original mob files by this table name. For details please see
// cloning snapshot for mob files.
private final byte[] refCellTags;
private StoreFileTracker mobStoreSFT = null;

public HMobStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam, boolean warmup) throws IOException {
Expand Down Expand Up @@ -273,7 +274,36 @@ public void commitFile(final Path sourceFile, Path targetPath) throws IOExceptio
if (!getFileSystem().rename(sourceFile, dstPath)) {
throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
}
}

// LOG.info("MOB SFT Debug - Destination path: {}", dstPath);
// LOG.info("MOB SFT Debug - Tracker class: {}", mobStoreSFT.getClass().getName());
// mobStoreSFT.add(Collections.singleton(StoreFileInfo.createStoreFileInfoForHFile(conf,
// getFileSystem(), dstPath, true)));
}

// private StoreFileTracker getMobStoreSFT() throws IOException {
// FileSystem fs = getFileSystem();
// HRegionFileSystem regionFS = (fs.exists(MobUtils.getMobRegionPath(conf,getTableName()))
// ? HRegionFileSystem.openRegionFromFileSystem(conf, fs, MobUtils.getMobTableDir(conf,
// getTableName()), MobUtils.getMobRegionInfo(getTableName()),
// false)
// : HRegionFileSystem.createRegionOnFileSystem(conf, fs, MobUtils.getMobTableDir(conf,
// getTableName()), MobUtils.getMobRegionInfo(getTableName())));
// StoreContext storeContext =
// StoreContext.getBuilder().withColumnFamilyDescriptor(getStoreContext().getFamily())
// .withRegionFileSystem(regionFS)
// .withFamilyStoreDirectoryPath(MobUtils.getMobFamilyPath(conf, getTableName(),
// getStoreContext().getFamily().getNameAsString()))
// .withColumnFamilyDescriptor(getStoreContext().getFamily())
// .build();
// StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,storeContext);
// // *** ADD DEBUG LOGGING ***
//
// LOG.info("MOB SFT Debug - Store context family dir: {}",
// storeContext.getFamilyStoreDirectoryPath());
// LOG.info("MOB SFT Debug - Region info: {}", storeContext.getRegionInfo());
// return sft;
// }

/**
* Validates a mob file by opening and closing it.
Expand Down
Loading