diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java index 6725e65e8b..3f1105571d 100644 --- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java +++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java @@ -409,6 +409,8 @@ public static Configuration getJobConf(final String[] args) throws IOException, cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class); cl.registerShortNameOfClass(JobConf.FileDirectory.class); cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class); + cl.registerShortNameOfClass(JobConf.CrailVolumeDirectory.class); + cl.registerShortNameOfClass(JobConf.RemoteFileStoreOpt.class); cl.registerShortNameOfClass(JobConf.PartitionTransportServerPort.class); cl.registerShortNameOfClass(JobConf.PartitionTransportServerBacklog.class); cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class); diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java index c2dc191275..803f4b4c66 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java +++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java @@ -51,6 +51,7 @@ public enum Value { MemoryStore, SerializedMemoryStore, LocalFileStore, - GlusterFileStore + GlusterFileStore, + CrailFileStore } } diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CrailEdgeDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CrailEdgeDataStorePass.java new file mode 100644 index 0000000000..3251ef70e0 --- /dev/null +++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/CrailEdgeDataStorePass.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating; + +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.edge.IREdge; +import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty; +import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires; + +import java.util.List; + +/** + * A pass to support Disaggregated Resources by tagging edges. + * This pass handles the DataStore ExecutionProperty. + */ +@Annotates(DataStoreProperty.class) +@Requires(DataStoreProperty.class) +public final class CrailEdgeDataStorePass extends AnnotatingPass { + /** + * Default constructor. + */ + public CrailEdgeDataStorePass() { + super(CrailEdgeDataStorePass.class); + } + + @Override + public IRDAG apply(final IRDAG dag) { + dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with CrailFileStore. + final List inEdges = dag.getIncomingEdgesOf(vertex); + inEdges.forEach(edge -> + edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.CrailFileStore))); + }); + return dag; + } +} diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/CrailPolicy.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/CrailPolicy.java new file mode 100644 index 0000000000..dc63896514 --- /dev/null +++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/CrailPolicy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.optimizer.policy; + +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.*; +import org.apache.nemo.compiler.optimizer.pass.compiletime.composite.DefaultCompositePass; +import org.apache.nemo.compiler.optimizer.pass.runtime.Message; + +/** + * A policy to demonstrate the disaggregation optimization, that uses CrailFS as file storage. + */ +public final class CrailPolicy implements Policy { + public static final PolicyBuilder BUILDER = + new PolicyBuilder() + .registerCompileTimePass(new CrailEdgeDataStorePass()) + .registerCompileTimePass(new DefaultCompositePass()); + private final Policy policy; + + /** + * Default constructor. + */ + public CrailPolicy() { + this.policy = BUILDER.build(); + } + + @Override + public IRDAG runCompileTimeOptimization(final IRDAG dag, final String dagDirectory) { + return this.policy.runCompileTimeOptimization(dag, dagDirectory); + } + + @Override + public IRDAG runRunTimeOptimizations(final IRDAG dag, final Message message) { + return this.policy.runRunTimeOptimizations(dag, message); + } +} diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java index 23cf597ddf..cc09f2d5c0 100644 --- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java +++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java @@ -76,6 +76,21 @@ public final class FileDirectory implements Name { public final class GlusterVolumeDirectory implements Name { } + /** Directory points the CrailFileStore volume used to store files in remote fashion. + * The directory has to be already created to give it as an option. (else exception is thrown) + */ + @NamedParameter(doc = "Directory points the CrailFS volume", short_name = "crail_dir", default_value = "") + public final class CrailVolumeDirectory implements Name { + } + + /** + * RemoteFileStore option specification. Two choices are available: GlusterFileStore or CrailFileStore. + * Default is the former one. + */ + @NamedParameter(doc = "Option for RemoteFileStore", short_name = "remote_option", default_value = "Gluster") + public final class RemoteFileStoreOpt implements Name { + } + /** * Specifies the type of the environment the workload runs on. (e.g., transient / large_shuffle) */ diff --git a/pom.xml b/pom.xml index 1d7b43c55d..600826ab31 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,6 @@ under the License. https://github.com/apache/incubator-nemo/tree/${project.scm.tag} master - 1.8 1.8 @@ -65,6 +64,7 @@ under the License. 3.25.2 42.2.5 1.7.20 + 1.1-incubating 2.13.0 2.0.0-beta.5 @@ -93,6 +93,56 @@ under the License. snappy-java 1.1.1.3 + + org.apache.crail + crail-storage + ${crail.version} + + + org.apache.crail + crail-storage-narpc + ${crail.version} + + + org.apache.crail + crail-storage-rdma + ${crail.version} + + + org.apache.crail + crail-storage-nvmf + ${crail.version} + + + org.apache.crail + crail-rpc + ${crail.version} + + + org.apache.crail + crail-rpc-narpc + ${crail.version} + + + org.apache.crail + crail-hdfs + ${crail.version} + + + org.apache.crail + crail-namenode + ${crail.version} + + + org.apache.crail + crail-rpc + ${crail.version} + + + org.apache.crail + crail-client + ${crail.version} + junit diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java index afca55aef0..61abfeaa20 100644 --- a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java +++ b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java @@ -28,6 +28,9 @@ import org.apache.nemo.runtime.common.comm.ControlMessage; import org.apache.nemo.runtime.common.message.ClientRPC; import org.apache.nemo.runtime.common.message.MessageParameters; +import org.apache.nemo.runtime.executor.data.stores.CrailFileStore; +import org.apache.nemo.runtime.executor.data.stores.RemoteFileStore; + import org.apache.nemo.runtime.master.BroadcastManagerMaster; import org.apache.nemo.runtime.master.RuntimeMaster; import org.apache.reef.annotations.audience.DriverSide; @@ -43,6 +46,7 @@ import org.apache.reef.io.network.util.StringIdentifierFactory; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.JavaConfigurationBuilder; import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.annotations.Unit; @@ -80,6 +84,7 @@ public final class NemoDriver { private final String localDirectory; private final String glusterDirectory; private final ClientRPC clientRPC; + private final String remoteOpt; private static ExecutorService runnerThread = Executors.newSingleThreadExecutor( new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build()); @@ -98,7 +103,8 @@ private NemoDriver(final UserApplicationRunner userApplicationRunner, @Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString, @Parameter(JobConf.JobId.class) final String jobId, @Parameter(JobConf.FileDirectory.class) final String localDirectory, - @Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory) { + @Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory, + @Parameter(JobConf.RemoteFileStoreOpt.class) final String remoteOpt) { IdManager.setInDriver(); this.userApplicationRunner = userApplicationRunner; this.runtimeMaster = runtimeMaster; @@ -110,6 +116,7 @@ private NemoDriver(final UserApplicationRunner userApplicationRunner, this.glusterDirectory = glusterDirectory; this.handler = new RemoteClientMessageLoggingHandler(client); this.clientRPC = clientRPC; + this.remoteOpt = remoteOpt; // TODO #69: Support job-wide execution property ResourceSitePass.setBandwidthSpecificationString(bandwidthString); clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.Notification, this::handleNotification); @@ -251,6 +258,10 @@ private Configuration getExecutorConfiguration(final String executorId) { .set(JobConf.JOB_ID, jobId) .build(); + final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); + if (remoteOpt.equals("crail")) jcb.bindImplementation(RemoteFileStore.class, CrailFileStore.class); + final Configuration remoteConf = jcb.build(); + final Configuration contextConfiguration = ContextConfiguration.CONF .set(ContextConfiguration.IDENTIFIER, executorId) // We set: contextId = executorId .set(ContextConfiguration.ON_CONTEXT_STARTED, NemoContext.ContextStartHandler.class) @@ -260,7 +271,7 @@ private Configuration getExecutorConfiguration(final String executorId) { final Configuration ncsConfiguration = getExecutorNcsConfiguration(); final Configuration messageConfiguration = getExecutorMessageConfiguration(executorId); - return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration); + return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration, remoteConf); } private Configuration getExecutorNcsConfiguration() { diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java index 8806432190..661496bb95 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java @@ -268,7 +268,8 @@ public void writeBlock(final Block block, .setBlockId(blockId) .setState(ControlMessage.BlockStateFromExecutor.AVAILABLE); - if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) { + if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore) + || DataStoreProperty.Value.CrailFileStore.equals(blockStore)) { blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE); } else { blockStateChangedMsgBuilder.setLocation(executorId); @@ -302,7 +303,8 @@ public void removeBlock(final String blockId, .setBlockId(blockId) .setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE); - if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) { + if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore) + || DataStoreProperty.Value.CrailFileStore.equals(blockStore)) { blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE); } else { blockStateChangedMsgBuilder.setLocation(executorId); @@ -345,7 +347,8 @@ public void run() { final Optional optionalBlock = getBlockStore(blockStore).readBlock(blockId); if (optionalBlock.isPresent()) { if (DataStoreProperty.Value.LocalFileStore.equals(blockStore) - || DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) { + || DataStoreProperty.Value.GlusterFileStore.equals(blockStore) + || DataStoreProperty.Value.CrailFileStore.equals(blockStore)) { final List fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange); for (final FileArea fileArea : fileAreas) { try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) { @@ -420,7 +423,6 @@ private CompletableFuture getDataFromLocalBlock( numSerializedBytes += partition.getNumSerializedBytes(); numEncodedBytes += partition.getNumEncodedBytes(); } - return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes, numEncodedBytes)); } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) { @@ -476,6 +478,8 @@ private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) { case LocalFileStore: return localFileStore; case GlusterFileStore: + return localFileStore; + case CrailFileStore: return remoteFileStore; default: throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported.")); @@ -499,6 +503,8 @@ private static ControlMessage.BlockStore convertBlockStore( case LocalFileStore: return ControlMessage.BlockStore.LOCAL_FILE; case GlusterFileStore: + return ControlMessage.BlockStore.LOCAL_FILE; //since it is treated the same way as LOCAL_FILE + case CrailFileStore: return ControlMessage.BlockStore.REMOTE_FILE; default: throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported.")); @@ -522,7 +528,7 @@ private static DataStoreProperty.Value convertBlockStore( case LOCAL_FILE: return DataStoreProperty.Value.LocalFileStore; case REMOTE_FILE: - return DataStoreProperty.Value.GlusterFileStore; + return DataStoreProperty.Value.CrailFileStore; default: throw new UnsupportedBlockStoreException(new Exception("This block store is not yet supported")); } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java index 65b167b0e9..48e6b938b3 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java @@ -18,6 +18,7 @@ */ package org.apache.nemo.runtime.executor.data.block; +import org.apache.crail.*; import org.apache.nemo.common.KeyRange; import org.apache.nemo.common.Pair; import org.apache.nemo.common.exception.BlockFetchException; @@ -33,15 +34,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; +import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; @@ -66,6 +62,10 @@ public final class FileBlock implements Block { private final Serializer serializer; private final String filePath; private final FileMetadata metadata; + private final Boolean crail; + @Nullable + private final CrailStore fs; + private CrailFile file; // not final since fetching the File might fail. /** * Constructor. @@ -84,24 +84,69 @@ public FileBlock(final String blockId, this.serializer = serializer; this.filePath = filePath; this.metadata = metadata; + this.crail = false; + this.fs = null; + this.file = null; + } + + /** + * Constructor for FileBlock that is being written in CrailFileStore. + * + * @param fs the {@link CrailStore} object created from {@link org.apache.crail.conf.CrailConfiguration} + */ + public FileBlock(final String blockId, + final Serializer serializer, + final String filePath, + final FileMetadata metadata, + final CrailStore fs) { + this.id = blockId; + this.nonCommittedPartitionsMap = new HashMap<>(); + this.serializer = serializer; + this.filePath = filePath; + this.metadata = metadata; + this.crail = true; + this.fs = fs; + try { + this.file = fs.create(filePath, CrailNodeType.DATAFILE, CrailStorageClass.DEFAULT, + CrailLocationClass.DEFAULT, true).get().asFile(); + file.syncDir(); + } catch (Exception e1) { + try { + this.file = fs.lookup(filePath).get().asFile(); + } catch (Exception e2) { + LOG.info("{} fetch failed", blockId); + throw new RuntimeException(); + } + } } /** * Writes the serialized data of this block having a specific key value as a partition to the file - * where this block resides. + * where this block resides. Supports both writing either in local file system or CrailFileSystem. * Invariant: This method does not support concurrent write. * * @param serializedPartitions the iterable of the serialized partitions to write. - * @throws IOException if fail to write. + * @throws Exception if fail to write. */ - private void writeToFile(final Iterable> serializedPartitions) - throws IOException { - try (FileChannel fileOutputChannel = new FileOutputStream(filePath, true).getChannel()) { - for (final SerializedPartition serializedPartition : serializedPartitions) { - // Reserve a partition write and get the metadata. - metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength()); - for (final ByteBuffer buffer: serializedPartition.getDirectBufferList()) { - fileOutputChannel.write(buffer); + private void writeToFile(final Iterable> serializedPartitions) throws Exception { + if (crail) { + try (CrailBufferedOutputStream fileOutputStream = file.getBufferedOutputStream(0)) { + for (final SerializedPartition serializedPartition : serializedPartitions) { + // Reserve a partition write and get the metadata. + metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength()); + for (final ByteBuffer buffer: serializedPartition.getDirectBufferList()) { + fileOutputStream.write(buffer); + } + } + } + } else { + try (FileChannel fileOutputChannel = new FileOutputStream(filePath, true).getChannel()) { + for (final SerializedPartition serializedPartition : serializedPartitions) { + // Reserve a partition write and get the metadata. + metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength()); + for (final ByteBuffer buffer: serializedPartition.getDirectBufferList()) { + fileOutputChannel.write(buffer); + } } } } @@ -173,7 +218,7 @@ public void writeSerializedPartitions(final Iterable> par } else { try { writeToFile(partitions); - } catch (final IOException e) { + } catch (final Exception e) { throw new BlockWriteException(e); } } @@ -195,7 +240,8 @@ public Iterable> readPartitions(final KeyRange keyRang final List> deserializedPartitions = new ArrayList<>(); try { final List> partitionKeyBytesPairs = new ArrayList<>(); - try (FileInputStream fileStream = new FileInputStream(filePath)) { + try (InputStream fileStream = crail + ? file.getBufferedInputStream(file.getCapacity()) : new FileInputStream(filePath)) { for (final PartitionMetadata partitionMetadata : metadata.getPartitionMetadataList()) { final K key = partitionMetadata.getKey(); if (keyRange.includes(key)) { @@ -216,8 +262,10 @@ public Iterable> readPartitions(final KeyRange keyRang new ByteArrayInputStream(partitionKeyBytes.right())); deserializedPartitions.add(deserializePartition); } - } catch (final IOException e) { - throw new BlockFetchException(e); + } catch (final IOException e1) { + throw new BlockFetchException(e1); + } catch (final Exception e2) { + //여기에 specific 한.. } return deserializedPartitions; @@ -240,7 +288,8 @@ public Iterable> readSerializedPartitions(final KeyRange // Deserialize the data final List> partitionsInRange = new ArrayList<>(); try { - try (FileInputStream fileStream = new FileInputStream(filePath)) { + try (InputStream fileStream = crail + ? file.getBufferedInputStream(file.getCapacity()) : new FileInputStream(filePath)) { for (final PartitionMetadata partitionmetadata : metadata.getPartitionMetadataList()) { final K key = partitionmetadata.getKey(); if (keyRange.includes(key)) { @@ -258,8 +307,10 @@ public Iterable> readSerializedPartitions(final KeyRange } } } - } catch (final IOException e) { - throw new BlockFetchException(e); + } catch (final IOException e1) { + throw new BlockFetchException(e1); + } catch (final Exception e2) { + // 여기에 specific Exception need to be thrown } return partitionsInRange; @@ -314,8 +365,21 @@ public List asFileAreas(final KeyRange keyRange) throws IOException { */ public void deleteFile() throws IOException { metadata.deleteMetadata(); - if (new File(filePath).exists()) { - Files.delete(Paths.get(filePath)); + if (!crail) { + if (new File(filePath).exists()) { + Files.delete(Paths.get(filePath)); + } + } else { + try { + if (fs.lookup(filePath).get() != null) { + fs.delete(filePath, true); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + LOG.info("Failed to delete file"); + e.printStackTrace(); + } } } @@ -366,6 +430,9 @@ public synchronized void commitPartitions() throws BlockWriteException { nonCommittedPartitionsMap.clear(); } catch (final IOException e) { throw new BlockWriteException(e); + } catch (Exception e) { + //e.printStackTrace(); + //throw new 여기에 세부적인 exception 던지기 } } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/metadata/CrailFileMetadata.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/metadata/CrailFileMetadata.java new file mode 100644 index 0000000000..8832c891f4 --- /dev/null +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/metadata/CrailFileMetadata.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.runtime.executor.data.metadata; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.crail.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +/** + * This class represents a metadata for a file block in CrailFileSystem. + * Because the data is stored in a CrailFileSystem and globally accessed by multiple nodes, + * each read, or deletion for a block needs one instance of this metadata. + * The metadata is stored in and read from a CrailFile (after a CrailFile block is committed). + * @param the key type of its partitions. + */ +@ThreadSafe +public final class CrailFileMetadata extends FileMetadata { + private static final Logger LOG = LoggerFactory.getLogger(CrailFileMetadata.class.getName()); + private final String metaFilePath; + private static CrailStore fs; + + /** + * Constructor for creating a non-committed new CrailFile metadata. + * + * @param metaFilePath the metadata file path. + * @param fs the CrailStore instance. + */ + private CrailFileMetadata(final String metaFilePath, final CrailStore fs) { + super(); + this.metaFilePath = metaFilePath; + this.fs = fs; + } + + /** + * Constructor for opening a existing CrailFile metadata. + * + * @param metaFilePath the metadata file path. + * @param partitionMetadataList the partition metadata list. + * @param fs the CrailStore instance. + */ + private CrailFileMetadata(final String metaFilePath, + final List> partitionMetadataList, final CrailStore fs) { + super(partitionMetadataList); + this.metaFilePath = metaFilePath; + this.fs = fs; + } + + /** + * @see FileMetadata#deleteMetadata() + */ + @Override + public void deleteMetadata() { + try { + fs.delete(metaFilePath, true).get().syncDir(); + } catch (Exception e) { + LOG.info("Metadata deletion failed"); + e.printStackTrace(); + } + } + + /** + * Write the collected {@link PartitionMetadata}s to the metadata file. + * Notifies that all writes are finished for the block corresponding to this metadata. + */ + @Override + public synchronized void commitBlock() throws IOException { + final Iterable> partitionMetadataItr = getPartitionMetadataList(); + try { + CrailBufferedOutputStream metaFileOutputstream = + fs.create(metaFilePath, CrailNodeType.DATAFILE, CrailStorageClass.DEFAULT, CrailLocationClass.DEFAULT, true) + .get().asFile().getBufferedOutputStream(0); + for (PartitionMetadata partitionMetadata : partitionMetadataItr) { + final byte[] key = SerializationUtils.serialize(partitionMetadata.getKey()); + metaFileOutputstream.writeInt(key.length); + metaFileOutputstream.write(key); + metaFileOutputstream.writeInt(partitionMetadata.getPartitionSize()); + metaFileOutputstream.writeLong(partitionMetadata.getOffset()); + } + metaFileOutputstream.close(); + } catch (Exception e) { + LOG.info("Error while writing meta data"); + e.printStackTrace(); + } + setCommitted(true); + } + + /** + * Creates a new block metadata. + * + * @param metaFilePath the path of the file to write metadata. + * @param cs the CrailStore instance. + * @param the key type of the block's partitions. + * @return the created block metadata. + */ + public static CrailFileMetadata create(final String metaFilePath, final CrailStore cs) { + return new CrailFileMetadata<>(metaFilePath, cs); + } + + /** + * Opens a existing block metadata in file. + * + * @param metaFilePath the path of the file to write metadata. + * @param cs the CrailStore instance + * @param the key type of the block's partitions. + * @return the created block metadata. + * @throws IOException if fail to open. + */ + public static CrailFileMetadata open(final String metaFilePath, + final CrailStore cs) throws IOException { + final List> partitionMetadataList = new ArrayList<>(); + try { + CrailBufferedInputStream dataInputStream = cs.lookup(metaFilePath).get().asFile().getBufferedInputStream(0); + while (dataInputStream.available() > 0) { + final int keyLength = dataInputStream.readInt(); + final byte[] desKey = new byte[keyLength]; + if (keyLength != dataInputStream.read(desKey)) { + throw new IOException("Invalid key length!"); + } + + final PartitionMetadata partitionMetadata = new PartitionMetadata<>( + SerializationUtils.deserialize(desKey), + dataInputStream.readInt(), + dataInputStream.readLong() + ); + partitionMetadataList.add(partitionMetadata); + } + } catch (Exception e) { + throw new IOException("Metadata " + metaFilePath + " does not exist!"); + } + return new CrailFileMetadata<>(metaFilePath, partitionMetadataList, cs); + } +} diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/CrailFileStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/CrailFileStore.java new file mode 100644 index 0000000000..8c39872b00 --- /dev/null +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/CrailFileStore.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.runtime.executor.data.stores; + +import org.apache.crail.*; +import org.apache.crail.conf.CrailConfiguration; +import org.apache.nemo.common.exception.BlockFetchException; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.common.exception.BlockWriteException; +import org.apache.nemo.runtime.executor.data.*; +import org.apache.nemo.runtime.executor.data.block.Block; +import org.apache.nemo.runtime.executor.data.block.FileBlock; +import org.apache.nemo.runtime.executor.data.metadata.CrailFileMetadata; +import org.apache.nemo.runtime.executor.data.streamchainer.Serializer; +import org.apache.reef.tang.annotations.Parameter; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; + +/** + * Stores blocks in CrailStore. + * Since the data is stored in CrailStore and globally accessed by multiple nodes, + * each read, or deletion for a file needs one instance of {@link FileBlock}. + * When FileBlock in Crail is created, it's metadata is maintained in memory until the block is committed. + * After the block is committed, the metadata is stored in and read from a CrailStore. + */ +@ThreadSafe +public final class CrailFileStore extends AbstractBlockStore implements RemoteFileStore { + private final String fileDirectory; + private final CrailConfiguration conf; + private final CrailStore fs; + + /** + * Constructor. + * + * @param volumeDirectory the CrailStore directory where we contain the files. + * @param jobId the job id. + * @param serializerManager the serializer manager. + * @throws Exception for any error occurred while trying to set Crail requirements. + */ + @Inject + private CrailFileStore(@Parameter(JobConf.CrailVolumeDirectory.class) final String volumeDirectory, + @Parameter(JobConf.JobId.class) final String jobId, + final SerializerManager serializerManager) throws Exception { + super(serializerManager); + this.conf = new CrailConfiguration(); + this.fs = CrailStore.newInstance(conf); + this.fileDirectory = volumeDirectory; + } + + @Override + public Block createBlock(final String blockId) { + deleteBlock(blockId); + final Serializer serializer = getSerializerFromWorker(blockId); + final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory); + final String metaPath = DataUtil.blockIdToMetaFilePath(blockId, fileDirectory); + final CrailFileMetadata metadata = CrailFileMetadata.create(metaPath, fs); + return new FileBlock<>(blockId, serializer, filePath, metadata, fs); + } + + /** + * Writes a committed block to this store. + * + * @param block the block to write. + * @throws BlockWriteException if fail to write. + */ + + @Override + public void writeBlock(final Block block) throws BlockWriteException { + if (!(block instanceof FileBlock)) { + throw new BlockWriteException(new Throwable( + this.toString() + " only accept " + FileBlock.class.getName())); + } else if (!block.isCommitted()) { + throw new BlockWriteException(new Throwable("The block " + block.getId() + "is not committed yet.")); + } + // Do nothing. The block have to be written in CrailStore file during commit. + } + + /** + * Reads a committed block from this store. + * + * @param blockId of the target partition. + * @return the target block (if it exists). + * @throws BlockFetchException for any error occurred while trying to fetch a block. + */ + + public Optional readBlock(final String blockId) throws BlockFetchException { + final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory); + try { + if (fs.lookup(filePath).get() == null) { + return Optional.empty(); + } else { + try { + final FileBlock block = getBlockFromFile(blockId); + return Optional.of(block); + } catch (final IOException e) { + throw new BlockFetchException(e); + } catch (Exception e) { + throw new BlockFetchException(e); + } + } + } catch (Exception e) { + throw new BlockFetchException(e); + } + } + + /** + * Removes the file that the target block is stored. + * + * @param blockId of the block. + * @return whether the block exists or not. + */ + @Override + public boolean deleteBlock(final String blockId) throws BlockFetchException { + final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory); + + try { + if (fs.lookup(filePath).get() != null) { + final FileBlock block = getBlockFromFile(blockId); + block.deleteFile(); + return true; + } else { + return false; + } + } catch (final IOException e) { + throw new BlockFetchException(e); + } catch (final Exception e) { + throw new BlockFetchException(e); + } + } + + /** + * Gets a {@link FileBlock} from the block and it's metadata file. + * Because the data is stored in CrailStore and globally accessed by multiple nodes, + * each read, or deletion for a file needs one instance of {@link FileBlock}, + * and the temporary block will not be maintained by this executor. + * + * @param blockId the ID of the block to get. + * @param the type of the key of the block. + * @return the {@link FileBlock} gotten. + * @throws Exception if fail to get. + */ + private FileBlock getBlockFromFile(final String blockId) throws Exception { + final Serializer serializer = getSerializerFromWorker(blockId); + final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory); + final CrailFileMetadata metadata = + CrailFileMetadata.open(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory), fs); + return new FileBlock<>(blockId, serializer, filePath, metadata, fs); + } +} diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/RemoteFileStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/RemoteFileStore.java index a157f54c3d..7fa6189fcc 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/RemoteFileStore.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/RemoteFileStore.java @@ -21,8 +21,9 @@ import org.apache.reef.tang.annotations.DefaultImplementation; /** - * Interface for remote block stores (e.g., GlusterFS, ...). + * Interface for remote block stores (e.g., GlusterFS, CrailFS...). */ + @DefaultImplementation(GlusterFileStore.class) public interface RemoteFileStore extends BlockStore { } diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java index 457960fb0f..55bd1fdfb9 100644 --- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java +++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;