-
Notifications
You must be signed in to change notification settings - Fork 64
[NEMO-351] Empowering Nemo with fast I/O using Apache Crail #219
base: master
Are you sure you want to change the base?
Changes from all commits
957a7b9
1c4b1bb
a43c842
54a6577
13da157
427db91
f6e3270
9fc8f8c
a7b9635
1e6e40a
cdd20b0
3a40a3c
40c1077
16aa04e
70e0053
d32ae05
1bec17e
0e58793
5031a1b
bc7edfc
abb4969
1127cf7
bd6bc82
8c7b26a
59e8aea
bb88241
a33c60d
105b33e
5698eff
079c7d7
85f4d49
73e5b4f
6fe0b5f
73572e9
a9d29b3
4612dcb
4a98394
a1d45bc
4bf5b18
3628b46
163a490
813bc9b
3de3676
80896a3
c631322
3559d3e
72d408c
9d812e0
f858e7c
f7be76d
ac06a43
c68cf24
ad3abb6
0a23ccf
fdbca0d
44371db
ef6004a
36cb453
bf34b26
b8dbfc0
6e4961d
70f53d2
8e8748f
420f8c2
eb7c7d9
e17c9ca
f5d972f
4bba466
31f17ac
78a8cae
18c7f7e
0b7e11d
915bba8
9876826
fb7ae5c
1dcde81
3ca9c75
e021878
c22fd4a
e4d5eb8
50a99ca
e698e98
a10bf8a
5b9f74a
911ccf3
bbc31d6
245244b
aaeb8dc
e28b41c
0a74406
f5a1edb
b48869d
cca426f
cb636b6
5dd038e
345fd8a
77761b7
85b0418
b0642ac
5aabfe7
a408885
d78979f
42e62c7
f8872cb
03d0552
46617ed
27e79f0
96e74b8
7e57edc
cd54784
c2be06d
a3d4f80
cc1b147
e62fbc7
829eb57
e93871f
ab84f1a
c4606f1
61561c1
7d006d8
eac61e1
8ea5cf5
fb7b74e
29a35a6
93efe7b
ed2a348
3133ee8
8edcb4d
f66b0e5
63781b5
2ed63db
d40eb47
ec45fbe
20d77a8
c3327cf
c4a23c2
bf04595
4e5dfcf
296a22d
9db6292
7aea8b7
fe6e5d0
231cddf
b1e4ab0
dd4066b
2215fef
bf74470
5842d92
fdc2f2b
6b673a8
836b8cc
98f8c62
bf0a06e
5cab207
b7d3216
8eb47d9
db3ea06
2da390f
34d5809
7138336
533f342
984f249
00c2ab4
bf3e5a2
6345cbf
98d8f37
5fe266f
cbc289b
8012c2b
4205379
e76d33d
3d18ce9
dc28f5d
77d8dba
fdd36db
0fe8691
260f423
c74630e
75cef7a
a308f1f
6e9b9ae
83bb13c
5fbc237
234af78
6ecbb63
865ef6f
e449ebc
e8b62e0
e828a6b
abca847
62cc257
26bd24f
8aa0a20
48a25be
ab4cdf8
1bf866e
1d5a6b1
a1ae1bf
581006c
1bb1d9b
284fa86
fa43b9a
76f34ca
108466e
185b32e
a25eef2
d57c442
c6ae2ef
8d5e21a
94e1928
e04d02b
76c0e51
a6a15f4
5cd3690
31a3456
23dbd82
0d84962
cc8827e
d319465
f141d7b
2f50495
47ee30c
5d3d7fd
7468fdd
16de92e
ba15a5e
bd54584
795da83
ae04605
d54ba3c
f838cdc
ac40d0d
231130c
0a0b62c
4d0eb21
0e00fb6
d1c9304
5b1a71b
4a0b3ea
29e708f
f154cd7
4cf9c32
35a6685
707847a
c637f34
c9f8b2c
949e248
3ea30cd
1ec909e
a6ed1d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating; | ||
|
||
import org.apache.nemo.common.ir.IRDAG; | ||
import org.apache.nemo.common.ir.edge.IREdge; | ||
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty; | ||
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* A pass to support Disaggregated Resources by tagging edges. | ||
* This pass handles the DataStore ExecutionProperty. | ||
*/ | ||
@Annotates(DataStoreProperty.class) | ||
@Requires(DataStoreProperty.class) | ||
public final class CrailEdgeDataStorePass extends AnnotatingPass { | ||
/** | ||
* Default constructor. | ||
*/ | ||
public CrailEdgeDataStorePass() { | ||
super(CrailEdgeDataStorePass.class); | ||
} | ||
|
||
@Override | ||
public IRDAG apply(final IRDAG dag) { | ||
dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with CrailFileStore. | ||
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex); | ||
inEdges.forEach(edge -> | ||
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.CrailFileStore))); | ||
}); | ||
return dag; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.nemo.compiler.optimizer.policy; | ||
|
||
import org.apache.nemo.common.ir.IRDAG; | ||
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.*; | ||
import org.apache.nemo.compiler.optimizer.pass.compiletime.composite.DefaultCompositePass; | ||
import org.apache.nemo.compiler.optimizer.pass.runtime.Message; | ||
|
||
/** | ||
* A policy to demonstrate the disaggregation optimization, that uses CrailFS as file storage. | ||
*/ | ||
public final class CrailPolicy implements Policy { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add an integration test to test this policy? |
||
public static final PolicyBuilder BUILDER = | ||
new PolicyBuilder() | ||
.registerCompileTimePass(new CrailEdgeDataStorePass()) | ||
.registerCompileTimePass(new DefaultCompositePass()); | ||
private final Policy policy; | ||
|
||
/** | ||
* Default constructor. | ||
*/ | ||
public CrailPolicy() { | ||
this.policy = BUILDER.build(); | ||
} | ||
|
||
@Override | ||
public IRDAG runCompileTimeOptimization(final IRDAG dag, final String dagDirectory) { | ||
return this.policy.runCompileTimeOptimization(dag, dagDirectory); | ||
} | ||
|
||
@Override | ||
public IRDAG runRunTimeOptimizations(final IRDAG dag, final Message<?> message) { | ||
return this.policy.runRunTimeOptimizations(dag, message); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,6 +76,21 @@ public final class FileDirectory implements Name<String> { | |
public final class GlusterVolumeDirectory implements Name<String> { | ||
} | ||
|
||
/** Directory points the CrailFileStore volume used to store files in remote fashion. | ||
* The directory has to be already created to give it as an option. (else exception is thrown) | ||
*/ | ||
@NamedParameter(doc = "Directory points the CrailFS volume", short_name = "crail_dir", default_value = "") | ||
public final class CrailVolumeDirectory implements Name<String> { | ||
} | ||
|
||
/** | ||
* RemoteFileStore option specification. Two choices are available: GlusterFileStore or CrailFileStore. | ||
* Default is the former one. | ||
*/ | ||
@NamedParameter(doc = "Option for RemoteFileStore", short_name = "remote_option", default_value = "Gluster") | ||
public final class RemoteFileStoreOpt implements Name<String> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this parameter. I think only the DataStoreProperty on the IR DAG should determine which store to use. I noticed the CrailFileStore property you added. |
||
} | ||
|
||
/** | ||
* Specifies the type of the environment the workload runs on. (e.g., transient / large_shuffle) | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,9 @@ | |
import org.apache.nemo.runtime.common.comm.ControlMessage; | ||
import org.apache.nemo.runtime.common.message.ClientRPC; | ||
import org.apache.nemo.runtime.common.message.MessageParameters; | ||
import org.apache.nemo.runtime.executor.data.stores.CrailFileStore; | ||
import org.apache.nemo.runtime.executor.data.stores.RemoteFileStore; | ||
|
||
import org.apache.nemo.runtime.master.BroadcastManagerMaster; | ||
import org.apache.nemo.runtime.master.RuntimeMaster; | ||
import org.apache.reef.annotations.audience.DriverSide; | ||
|
@@ -43,6 +46,7 @@ | |
import org.apache.reef.io.network.util.StringIdentifierFactory; | ||
import org.apache.reef.tang.Configuration; | ||
import org.apache.reef.tang.Configurations; | ||
import org.apache.reef.tang.JavaConfigurationBuilder; | ||
import org.apache.reef.tang.Tang; | ||
import org.apache.reef.tang.annotations.Parameter; | ||
import org.apache.reef.tang.annotations.Unit; | ||
|
@@ -80,6 +84,7 @@ public final class NemoDriver { | |
private final String localDirectory; | ||
private final String glusterDirectory; | ||
private final ClientRPC clientRPC; | ||
private final String remoteOpt; | ||
|
||
private static ExecutorService runnerThread = Executors.newSingleThreadExecutor( | ||
new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build()); | ||
|
@@ -98,7 +103,8 @@ private NemoDriver(final UserApplicationRunner userApplicationRunner, | |
@Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString, | ||
@Parameter(JobConf.JobId.class) final String jobId, | ||
@Parameter(JobConf.FileDirectory.class) final String localDirectory, | ||
@Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory) { | ||
@Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory, | ||
@Parameter(JobConf.RemoteFileStoreOpt.class) final String remoteOpt) { | ||
IdManager.setInDriver(); | ||
this.userApplicationRunner = userApplicationRunner; | ||
this.runtimeMaster = runtimeMaster; | ||
|
@@ -110,6 +116,7 @@ private NemoDriver(final UserApplicationRunner userApplicationRunner, | |
this.glusterDirectory = glusterDirectory; | ||
this.handler = new RemoteClientMessageLoggingHandler(client); | ||
this.clientRPC = clientRPC; | ||
this.remoteOpt = remoteOpt; | ||
// TODO #69: Support job-wide execution property | ||
ResourceSitePass.setBandwidthSpecificationString(bandwidthString); | ||
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.Notification, this::handleNotification); | ||
|
@@ -251,6 +258,10 @@ private Configuration getExecutorConfiguration(final String executorId) { | |
.set(JobConf.JOB_ID, jobId) | ||
.build(); | ||
|
||
final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); | ||
if (remoteOpt.equals("crail")) jcb.bindImplementation(RemoteFileStore.class, CrailFileStore.class); | ||
final Configuration remoteConf = jcb.build(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove. (See above) |
||
|
||
final Configuration contextConfiguration = ContextConfiguration.CONF | ||
.set(ContextConfiguration.IDENTIFIER, executorId) // We set: contextId = executorId | ||
.set(ContextConfiguration.ON_CONTEXT_STARTED, NemoContext.ContextStartHandler.class) | ||
|
@@ -260,7 +271,7 @@ private Configuration getExecutorConfiguration(final String executorId) { | |
final Configuration ncsConfiguration = getExecutorNcsConfiguration(); | ||
final Configuration messageConfiguration = getExecutorMessageConfiguration(executorId); | ||
|
||
return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration); | ||
return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration, remoteConf); | ||
} | ||
|
||
private Configuration getExecutorNcsConfiguration() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,7 +268,8 @@ public void writeBlock(final Block block, | |
.setBlockId(blockId) | ||
.setState(ControlMessage.BlockStateFromExecutor.AVAILABLE); | ||
|
||
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) { | ||
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore) | ||
|| DataStoreProperty.Value.CrailFileStore.equals(blockStore)) { | ||
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE); | ||
} else { | ||
blockStateChangedMsgBuilder.setLocation(executorId); | ||
|
@@ -302,7 +303,8 @@ public void removeBlock(final String blockId, | |
.setBlockId(blockId) | ||
.setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE); | ||
|
||
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) { | ||
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore) | ||
|| DataStoreProperty.Value.CrailFileStore.equals(blockStore)) { | ||
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE); | ||
} else { | ||
blockStateChangedMsgBuilder.setLocation(executorId); | ||
|
@@ -345,7 +347,8 @@ public void run() { | |
final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId); | ||
if (optionalBlock.isPresent()) { | ||
if (DataStoreProperty.Value.LocalFileStore.equals(blockStore) | ||
|| DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) { | ||
|| DataStoreProperty.Value.GlusterFileStore.equals(blockStore) | ||
|| DataStoreProperty.Value.CrailFileStore.equals(blockStore)) { | ||
final List<FileArea> fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange); | ||
for (final FileArea fileArea : fileAreas) { | ||
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) { | ||
|
@@ -420,7 +423,6 @@ private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock( | |
numSerializedBytes += partition.getNumSerializedBytes(); | ||
numEncodedBytes += partition.getNumEncodedBytes(); | ||
} | ||
|
||
return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes, | ||
numEncodedBytes)); | ||
} catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) { | ||
|
@@ -476,6 +478,8 @@ private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) { | |
case LocalFileStore: | ||
return localFileStore; | ||
case GlusterFileStore: | ||
return localFileStore; | ||
case CrailFileStore: | ||
return remoteFileStore; | ||
default: | ||
throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported.")); | ||
|
@@ -499,6 +503,8 @@ private static ControlMessage.BlockStore convertBlockStore( | |
case LocalFileStore: | ||
return ControlMessage.BlockStore.LOCAL_FILE; | ||
case GlusterFileStore: | ||
return ControlMessage.BlockStore.LOCAL_FILE; //since it is treated the same way as LOCAL_FILE | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about this. |
||
case CrailFileStore: | ||
return ControlMessage.BlockStore.REMOTE_FILE; | ||
default: | ||
throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported.")); | ||
|
@@ -522,7 +528,7 @@ private static DataStoreProperty.Value convertBlockStore( | |
case LOCAL_FILE: | ||
return DataStoreProperty.Value.LocalFileStore; | ||
case REMOTE_FILE: | ||
return DataStoreProperty.Value.GlusterFileStore; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I got stuck here without a solution: Converting the ControlMessage REMOTE_FILE to either CrailFileStore or GlusterFileStore. For now, I just substituted GlusterFileStore to CrailFileStore, and let GlusterFileStore be converted to LocalFileStore, which absolutely isn't the right way :( Any suggestions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd use two variables: GLUSTER_FILE and CRAIL_FILE |
||
return DataStoreProperty.Value.CrailFileStore; | ||
default: | ||
throw new UnsupportedBlockStoreException(new Exception("This block store is not yet supported")); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you file a new JIRA issue to convert JobConf parameters for directory paths (FileDirectory, GlusterVolumeDirectory, CrailVolumeDirectory) to DataStoreProperty parameters? To do this, the type of the possible values of the DataStoreProperty should also change from
enum
toclass
.I suggest this change, because I think we should aim to integrate into the IR DAG as many configuration parameters as possible. Another benefit is that it saves the trouble of having to configure both JobConf and IR DAG to use a specific Crail/Gluster/File directory. Finally, this enables us to configure different directories for different IREdges.
A related issue is integrating the executor_json parameter into the IR DAG: https://issues.apache.org/jira/browse/NEMO-382