Skip to content
This repository was archived by the owner on Jul 1, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
279 commits
Select commit Hold shift + click to select a range
957a7b9
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 14, 2019
1c4b1bb
Nemo Driver for Crail
hy00nc Feb 16, 2019
a43c842
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
54a6577
error handling
hy00nc Feb 16, 2019
13da157
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
427db91
file creation
hy00nc Feb 16, 2019
f6e3270
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
9fc8f8c
file creation edit
hy00nc Feb 16, 2019
a7b9635
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
1e6e40a
parent node created
hy00nc Feb 16, 2019
cdd20b0
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
3a40a3c
directory structure changed
hy00nc Feb 16, 2019
40c1077
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
16aa04e
error catch
hy00nc Feb 16, 2019
70e0053
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
d32ae05
parent node created
hy00nc Feb 16, 2019
1bec17e
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
0e58793
edit
hy00nc Feb 16, 2019
5031a1b
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
bc7edfc
edit
hy00nc Feb 16, 2019
abb4969
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
1127cf7
metadata creation
hy00nc Feb 16, 2019
bd6bc82
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
8c7b26a
edit
hy00nc Feb 16, 2019
59e8aea
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
bb88241
Revert "edit"
hy00nc Feb 16, 2019
a33c60d
Revert "metadata creation"
hy00nc Feb 16, 2019
105b33e
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
5698eff
logging
hy00nc Feb 16, 2019
079c7d7
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
85f4d49
logging
hy00nc Feb 16, 2019
73e5b4f
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
6fe0b5f
Revert "logging"
hy00nc Feb 16, 2019
73572e9
Revert "Revert "metadata creation""
hy00nc Feb 16, 2019
a9d29b3
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
4612dcb
logging
hy00nc Feb 16, 2019
4a98394
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
a1d45bc
remoteMetaData -> LocalMetadata
hy00nc Feb 16, 2019
4bf5b18
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
3628b46
logging
hy00nc Feb 16, 2019
163a490
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
813bc9b
revert
hy00nc Feb 16, 2019
3de3676
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
80896a3
buffer usage edit
hy00nc Feb 16, 2019
c631322
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 16, 2019
3559d3e
file structure edit
hy00nc Feb 17, 2019
72d408c
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
9d812e0
use LocalFileMetaData for CrailFileStore
hy00nc Feb 17, 2019
f858e7c
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
f7be76d
CrailFileMetaData
hy00nc Feb 17, 2019
ac06a43
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
c68cf24
logging
hy00nc Feb 17, 2019
ad3abb6
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
0a23ccf
metadata write added
hy00nc Feb 17, 2019
fdbca0d
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
44371db
logging
hy00nc Feb 17, 2019
ef6004a
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
36cb453
block fetch
hy00nc Feb 17, 2019
bf34b26
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
b8dbfc0
exception handling
hy00nc Feb 17, 2019
6e4961d
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
70f53d2
exception handling
hy00nc Feb 17, 2019
8e8748f
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
420f8c2
exception handling
hy00nc Feb 17, 2019
eb7c7d9
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
e17c9ca
fs.lookup usage edit
hy00nc Feb 17, 2019
f5d972f
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
4bba466
fs.lookup usage edit
hy00nc Feb 17, 2019
31f17ac
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
78a8cae
fs.lookup usage edit
hy00nc Feb 17, 2019
18c7f7e
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
0b7e11d
fs.lookup usage edit
hy00nc Feb 17, 2019
915bba8
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
9876826
logging revert
yooneo0124 Feb 17, 2019
fb7ae5c
fs.lookup usage edit
hy00nc Feb 17, 2019
1dcde81
edit
yooneo0124 Feb 17, 2019
3ca9c75
Merge branch 'crail' of https://github.com/hy00nc/incubator-nemo into…
yooneo0124 Feb 17, 2019
e021878
logging
hy00nc Feb 18, 2019
c22fd4a
metadata fetch
hy00nc Feb 18, 2019
e4d5eb8
metadata fetch
hy00nc Feb 18, 2019
50a99ca
exception handling
hy00nc Feb 18, 2019
e698e98
readPartitions in Crail
hy00nc Feb 18, 2019
a10bf8a
edit
hy00nc Feb 18, 2019
5b9f74a
edit
hy00nc Feb 18, 2019
911ccf3
edit
hy00nc Feb 18, 2019
bbc31d6
handle already existing metadata + added tpch
yooneo0124 Feb 18, 2019
245244b
Merge branch 'crail_meta' of https://github.com/hy00nc/incubator-nemo…
yooneo0124 Feb 18, 2019
aaeb8dc
metadata path as local dir
yooneo0124 Feb 18, 2019
e28b41c
logging delete
hy00nc Feb 20, 2019
0a74406
logging delete
hy00nc Feb 20, 2019
f5a1edb
logging delete
hy00nc Feb 20, 2019
b48869d
CrailFileBlock created
hy00nc Feb 25, 2019
cca426f
FileBlock reverted
hy00nc Feb 25, 2019
cb636b6
CrailFileBlock edit
hy00nc Feb 25, 2019
5dd038e
Merge remote-tracking branch 'jy/crail_tpch' into refactor_crail
yooneo0124 Feb 25, 2019
345fd8a
Merge branch 'refactor_crail' of https://github.com/hy00nc/incubator-…
yooneo0124 Feb 25, 2019
77761b7
metadatafile path edit revert
hy00nc Feb 25, 2019
85b0418
metadata revert
hy00nc Feb 25, 2019
b0642ac
logging
hy00nc Feb 26, 2019
5aabfe7
logging revert
hy00nc Feb 26, 2019
a408885
debugging
hy00nc Feb 26, 2019
d78979f
metadata fix
hy00nc Feb 27, 2019
42e62c7
logging
hy00nc Feb 27, 2019
f8872cb
logging
hy00nc Feb 27, 2019
03d0552
metadata logic testing + logging
hy00nc Feb 27, 2019
46617ed
metadata logging
hy00nc Feb 27, 2019
27e79f0
refactoring
hy00nc Feb 28, 2019
96e74b8
inputstream close debugging
hy00nc Mar 2, 2019
7e57edc
inputstream skip debugging
hy00nc Mar 2, 2019
cd54784
inputstream skip debugging revert
hy00nc Mar 2, 2019
c2be06d
inputstream skip debugging
hy00nc Mar 2, 2019
a3d4f80
inputstream skip debugging revert
hy00nc Mar 2, 2019
cc1b147
metadata logging
hy00nc Mar 2, 2019
e62fbc7
logging
hy00nc Mar 2, 2019
829eb57
logging
hy00nc Mar 2, 2019
e93871f
logging
hy00nc Mar 2, 2019
ab84f1a
logging
hy00nc Mar 2, 2019
c4606f1
logging
hy00nc Mar 2, 2019
61561c1
logging
hy00nc Mar 2, 2019
7d006d8
logging
hy00nc Mar 2, 2019
eac61e1
logging
hy00nc Mar 2, 2019
8ea5cf5
logging
hy00nc Mar 2, 2019
fb7b74e
logging
hy00nc Mar 2, 2019
29a35a6
logging
hy00nc Mar 2, 2019
93efe7b
logging
hy00nc Mar 2, 2019
ed2a348
logging revert
hy00nc Mar 2, 2019
3133ee8
logging edit
hy00nc Mar 4, 2019
8edcb4d
skip
hy00nc Mar 4, 2019
f66b0e5
skip revert
hy00nc Mar 4, 2019
63781b5
seek!!!!!
hy00nc Mar 4, 2019
2ed63db
seek!!!!!
hy00nc Mar 4, 2019
d40eb47
logging
hy00nc Mar 4, 2019
ec45fbe
logging
hy00nc Mar 4, 2019
20d77a8
logging
hy00nc Mar 4, 2019
c3327cf
logging
hy00nc Mar 4, 2019
c4a23c2
logging
hy00nc Mar 4, 2019
bf04595
logging
hy00nc Mar 6, 2019
4e5dfcf
logging revert
hy00nc Mar 6, 2019
296a22d
partition size logging
hy00nc Mar 7, 2019
9db6292
skip logging
hy00nc Mar 7, 2019
7aea8b7
skipbytes erased
hy00nc Mar 11, 2019
fe6e5d0
revert
hy00nc Mar 11, 2019
231cddf
test
hy00nc Mar 11, 2019
b1e4ab0
revert
hy00nc Mar 11, 2019
dd4066b
logging inputstream position
hy00nc Mar 11, 2019
2215fef
testing substitution of skip with read
hy00nc Mar 11, 2019
bf74470
revert
hy00nc Mar 11, 2019
5842d92
seek test
hy00nc Mar 11, 2019
fdc2f2b
seek test
hy00nc Mar 11, 2019
6b673a8
revert
hy00nc Mar 11, 2019
836b8cc
capacity set
hy00nc Mar 12, 2019
98f8c62
seek
hy00nc Mar 12, 2019
bf0a06e
revert
hy00nc Mar 12, 2019
5cab207
logging partitionbytes
hy00nc Mar 13, 2019
b7d3216
logging partitionbytes
hy00nc Mar 13, 2019
8eb47d9
logging partitionbytes
hy00nc Mar 13, 2019
db3ea06
logging partitionbytes
hy00nc Mar 13, 2019
2da390f
sync test
hy00nc Mar 13, 2019
34d5809
logging data read
hy00nc Mar 13, 2019
7138336
logging data read
hy00nc Mar 13, 2019
533f342
seek test
hy00nc Mar 13, 2019
984f249
refactoring
hy00nc Mar 13, 2019
00c2ab4
compression/decompression disabled
hy00nc Mar 13, 2019
bf3e5a2
watermark disabled
hy00nc Mar 13, 2019
6345cbf
log
hy00nc Mar 14, 2019
98d8f37
log
hy00nc Mar 14, 2019
5fe266f
log
hy00nc Mar 14, 2019
cbc289b
revert watermark & Compression/Decompression disable
hy00nc Mar 17, 2019
8012c2b
policy edit
hy00nc Mar 18, 2019
4205379
logging
hy00nc Mar 18, 2019
e76d33d
logging
hy00nc Mar 18, 2019
3d18ce9
logging
hy00nc Mar 18, 2019
dc28f5d
unable limitedInputStream
hy00nc Mar 18, 2019
77d8dba
revert unable LimitedInputStream
hy00nc Mar 18, 2019
fdd36db
logging
hy00nc Mar 18, 2019
0fe8691
testing w/o compression
hy00nc Mar 18, 2019
260f423
revert iterator check
hy00nc Mar 18, 2019
c74630e
able compression
hy00nc Mar 18, 2019
75cef7a
logging
hy00nc Mar 18, 2019
a308f1f
logging
hy00nc Mar 18, 2019
6e9b9ae
logging
hy00nc Mar 18, 2019
83bb13c
logging
hy00nc Mar 18, 2019
5fbc237
serialized partition length
hy00nc Mar 18, 2019
234af78
logging erased
hy00nc Mar 18, 2019
6ecbb63
new getCount() testing
hy00nc Mar 21, 2019
865ef6f
partition length debugging
hy00nc Mar 21, 2019
e449ebc
partition length debugging
hy00nc Mar 21, 2019
e8b62e0
partition length debugging
hy00nc Mar 21, 2019
e828a6b
replace to java native methods
hy00nc Mar 21, 2019
abca847
logging
hy00nc Mar 21, 2019
62cc257
logging erased
hy00nc Mar 21, 2019
26bd24f
metadata parameter added
hy00nc Mar 24, 2019
8aa0a20
metadata CrailStore and conf minimized for test
hy00nc Mar 24, 2019
48a25be
bugfix
hy00nc Mar 25, 2019
ab4cdf8
refactor comments
hy00nc Apr 29, 2019
1bf866e
checkstyle and few comments
hy00nc Apr 30, 2019
1d5a6b1
comments trimmed
hy00nc Jun 2, 2019
a1ae1bf
comments trimmed
hy00nc Jun 2, 2019
581006c
comments trimmed
hy00nc Jun 2, 2019
1bb1d9b
reverted
hy00nc Jun 2, 2019
284fa86
refactoring
hy00nc Jun 3, 2019
fa43b9a
crail-assembly deletion -> build failure
hy00nc Jun 3, 2019
76f34ca
??
hy00nc Jun 3, 2019
108466e
Crail API changed
hy00nc Jun 3, 2019
185b32e
crail directory just in the root since it is just an ephemeral data...
hy00nc Jun 5, 2019
a25eef2
crail API
hy00nc Jun 5, 2019
d57c442
dependency
hy00nc Jun 5, 2019
c6ae2ef
all dependencies
hy00nc Jun 5, 2019
8d5e21a
dependency
hy00nc Jun 5, 2019
94e1928
path edit
hy00nc Jun 5, 2019
e04d02b
path edit
hy00nc Jun 5, 2019
76c0e51
refactoring
hy00nc Jun 5, 2019
a6a15f4
refactoring
hy00nc Jun 5, 2019
5cd3690
refactoring
hy00nc Jun 5, 2019
31a3456
refactoring
hy00nc Jun 5, 2019
23dbd82
refactoring
hy00nc Jun 5, 2019
0d84962
refactoring
hy00nc Jun 5, 2019
cc8827e
checkstyle
hy00nc Jun 7, 2019
d319465
option for using CrailFS
hy00nc Jun 10, 2019
f141d7b
testing crail option
hy00nc Jun 10, 2019
2f50495
logging
hy00nc Jun 10, 2019
47ee30c
logging
hy00nc Jun 10, 2019
5d3d7fd
tests
hy00nc Jun 10, 2019
7468fdd
checkstyle
hy00nc Jun 10, 2019
16de92e
checkstyle and tests
hy00nc Jun 10, 2019
ba15a5e
for TPC-H
hy00nc Jun 10, 2019
bd54584
tpch
hy00nc Jun 10, 2019
795da83
Revert "tpch"
hy00nc Jun 10, 2019
ae04605
tpch
hy00nc Jun 10, 2019
d54ba3c
this is the right tpch :)
hy00nc Jun 10, 2019
f838cdc
gluster and crail conflict issue...
hy00nc Jun 10, 2019
ac40d0d
back to original
hy00nc Jun 10, 2019
231130c
revert tpch
hy00nc Jun 10, 2019
0a0b62c
minor edits
hy00nc Jun 11, 2019
4d0eb21
pom.xml version
hy00nc Jun 24, 2019
0e00fb6
BlockManagerWorker reuse code
hy00nc Jun 24, 2019
d1c9304
FileBlock to support Crail file (exception not specified yet)
hy00nc Jun 24, 2019
5b1a71b
delete CrailFileBlock
hy00nc Jun 24, 2019
4a0b3ea
Merge branch 'master' into Crail_PR
hy00nc Jun 25, 2019
29e708f
API update
hy00nc Jun 25, 2019
f154cd7
crail dependency
hy00nc Jun 25, 2019
4cf9c32
import edit
hy00nc Jun 25, 2019
35a6685
crail version edit
hy00nc Jun 25, 2019
707847a
checkstyle
hy00nc Jun 25, 2019
c637f34
checktyle (CrailFileMetaData needs refactoring)
hy00nc Jun 25, 2019
c9f8b2c
API back to 1.1
hy00nc Jun 25, 2019
949e248
Merge branch 'master' into Crail_PR
hy00nc Jun 25, 2019
3ea30cd
import
hy00nc Jun 25, 2019
1ec909e
cannot resolve outputstream?
hy00nc Jun 25, 2019
a6ed1d6
cannot resolve outputstream?
hy00nc Jun 25, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/src/main/java/org/apache/nemo/client/JobLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ public static Configuration getJobConf(final String[] args) throws IOException,
cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class);
cl.registerShortNameOfClass(JobConf.FileDirectory.class);
cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class);
cl.registerShortNameOfClass(JobConf.CrailVolumeDirectory.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file a new JIRA issue to convert JobConf parameters for directory paths (FileDirectory, GlusterVolumeDirectory, CrailVolumeDirectory) to DataStoreProperty parameters? To do this, the type of the possible values of the DataStoreProperty should also change from enum to class.

I suggest this change, because I think we should aim to integrate into the IR DAG as many configuration parameters as possible. Another benefit is that it saves the trouble of having to configure both JobConf and IR DAG to use a specific Crail/Gluster/File directory. Finally, this enables us to configure different directories for different IREdges.

A related issue is integrating the executor_json parameter into the IR DAG: https://issues.apache.org/jira/browse/NEMO-382

cl.registerShortNameOfClass(JobConf.RemoteFileStoreOpt.class);
cl.registerShortNameOfClass(JobConf.PartitionTransportServerPort.class);
cl.registerShortNameOfClass(JobConf.PartitionTransportServerBacklog.class);
cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public enum Value {
MemoryStore,
SerializedMemoryStore,
LocalFileStore,
GlusterFileStore
GlusterFileStore,
CrailFileStore
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;

import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;

import java.util.List;

/**
* A pass to support Disaggregated Resources by tagging edges.
* This pass handles the DataStore ExecutionProperty.
*/
@Annotates(DataStoreProperty.class)
@Requires(DataStoreProperty.class)
public final class CrailEdgeDataStorePass extends AnnotatingPass {
/**
* Default constructor.
*/
public CrailEdgeDataStorePass() {
super(CrailEdgeDataStorePass.class);
}

@Override
public IRDAG apply(final IRDAG dag) {
dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with CrailFileStore.
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
inEdges.forEach(edge ->
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.CrailFileStore)));
});
return dag;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.optimizer.policy;

import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.*;
import org.apache.nemo.compiler.optimizer.pass.compiletime.composite.DefaultCompositePass;
import org.apache.nemo.compiler.optimizer.pass.runtime.Message;

/**
* A policy to demonstrate the disaggregation optimization, that uses CrailFS as file storage.
*/
public final class CrailPolicy implements Policy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an integration test to test this policy?
I understand that it is tricky to start and stop Crail processes, which are external to Nemo.
I wonder if Crail provides an option such as the HDFS minicluster option. Something like that would make things much easier.
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CLIMiniCluster.html

public static final PolicyBuilder BUILDER =
new PolicyBuilder()
.registerCompileTimePass(new CrailEdgeDataStorePass())
.registerCompileTimePass(new DefaultCompositePass());
private final Policy policy;

/**
* Default constructor.
*/
public CrailPolicy() {
this.policy = BUILDER.build();
}

@Override
public IRDAG runCompileTimeOptimization(final IRDAG dag, final String dagDirectory) {
return this.policy.runCompileTimeOptimization(dag, dagDirectory);
}

@Override
public IRDAG runRunTimeOptimizations(final IRDAG dag, final Message<?> message) {
return this.policy.runRunTimeOptimizations(dag, message);
}
}
15 changes: 15 additions & 0 deletions conf/src/main/java/org/apache/nemo/conf/JobConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ public final class FileDirectory implements Name<String> {
public final class GlusterVolumeDirectory implements Name<String> {
}

/** Directory points the CrailFileStore volume used to store files in remote fashion.
* The directory has to be already created to give it as an option. (else exception is thrown)
*/
@NamedParameter(doc = "Directory points the CrailFS volume", short_name = "crail_dir", default_value = "")
public final class CrailVolumeDirectory implements Name<String> {
}

/**
* RemoteFileStore option specification. Two choices are available: GlusterFileStore or CrailFileStore.
* Default is the former one.
*/
@NamedParameter(doc = "Option for RemoteFileStore", short_name = "remote_option", default_value = "Gluster")
public final class RemoteFileStoreOpt implements Name<String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this parameter. I think only the DataStoreProperty on the IR DAG should determine which store to use.

I noticed the CrailFileStore property you added.
https://github.com/apache/incubator-nemo/pull/219/files#diff-aa9a871d8e9c9075327f21350eab71d1R53

}

/**
* Specifies the type of the environment the workload runs on. (e.g., transient / large_shuffle)
*/
Expand Down
52 changes: 51 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ under the License.
<url>https://github.com/apache/incubator-nemo/tree/${project.scm.tag}</url>
<tag>master</tag>
</scm>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
Expand All @@ -65,6 +64,7 @@ under the License.
<sqlite-jdbc.version>3.25.2</sqlite-jdbc.version>
<postgresql.version>42.2.5</postgresql.version>
<slf4j.version>1.7.20</slf4j.version>
<crail.version>1.1-incubating</crail.version>
<!-- Tests -->
<mockito.version>2.13.0</mockito.version>
<powermock.version>2.0.0-beta.5</powermock.version>
Expand Down Expand Up @@ -93,6 +93,56 @@ under the License.
<artifactId>snappy-java</artifactId>
<version>1.1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-storage</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-storage-narpc</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-storage-rdma</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-storage-nvmf</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-rpc</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-rpc-narpc</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-hdfs</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-namenode</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-rpc</artifactId>
<version>${crail.version}</version>
</dependency>
<dependency>
<groupId>org.apache.crail</groupId>
<artifactId>crail-client</artifactId>
<version>${crail.version}</version>
</dependency>
<!--tests-->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.ClientRPC;
import org.apache.nemo.runtime.common.message.MessageParameters;
import org.apache.nemo.runtime.executor.data.stores.CrailFileStore;
import org.apache.nemo.runtime.executor.data.stores.RemoteFileStore;

import org.apache.nemo.runtime.master.BroadcastManagerMaster;
import org.apache.nemo.runtime.master.RuntimeMaster;
import org.apache.reef.annotations.audience.DriverSide;
Expand All @@ -43,6 +46,7 @@
import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
Expand Down Expand Up @@ -80,6 +84,7 @@ public final class NemoDriver {
private final String localDirectory;
private final String glusterDirectory;
private final ClientRPC clientRPC;
private final String remoteOpt;

private static ExecutorService runnerThread = Executors.newSingleThreadExecutor(
new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());
Expand All @@ -98,7 +103,8 @@ private NemoDriver(final UserApplicationRunner userApplicationRunner,
@Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString,
@Parameter(JobConf.JobId.class) final String jobId,
@Parameter(JobConf.FileDirectory.class) final String localDirectory,
@Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory) {
@Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory,
@Parameter(JobConf.RemoteFileStoreOpt.class) final String remoteOpt) {
IdManager.setInDriver();
this.userApplicationRunner = userApplicationRunner;
this.runtimeMaster = runtimeMaster;
Expand All @@ -110,6 +116,7 @@ private NemoDriver(final UserApplicationRunner userApplicationRunner,
this.glusterDirectory = glusterDirectory;
this.handler = new RemoteClientMessageLoggingHandler(client);
this.clientRPC = clientRPC;
this.remoteOpt = remoteOpt;
// TODO #69: Support job-wide execution property
ResourceSitePass.setBandwidthSpecificationString(bandwidthString);
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.Notification, this::handleNotification);
Expand Down Expand Up @@ -251,6 +258,10 @@ private Configuration getExecutorConfiguration(final String executorId) {
.set(JobConf.JOB_ID, jobId)
.build();

final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
if (remoteOpt.equals("crail")) jcb.bindImplementation(RemoteFileStore.class, CrailFileStore.class);
final Configuration remoteConf = jcb.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove. (See above)


final Configuration contextConfiguration = ContextConfiguration.CONF
.set(ContextConfiguration.IDENTIFIER, executorId) // We set: contextId = executorId
.set(ContextConfiguration.ON_CONTEXT_STARTED, NemoContext.ContextStartHandler.class)
Expand All @@ -260,7 +271,7 @@ private Configuration getExecutorConfiguration(final String executorId) {
final Configuration ncsConfiguration = getExecutorNcsConfiguration();
final Configuration messageConfiguration = getExecutorMessageConfiguration(executorId);

return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration);
return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration, remoteConf);
}

private Configuration getExecutorNcsConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public void writeBlock(final Block block,
.setBlockId(blockId)
.setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);

if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)
|| DataStoreProperty.Value.CrailFileStore.equals(blockStore)) {
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
} else {
blockStateChangedMsgBuilder.setLocation(executorId);
Expand Down Expand Up @@ -302,7 +303,8 @@ public void removeBlock(final String blockId,
.setBlockId(blockId)
.setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);

if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)
|| DataStoreProperty.Value.CrailFileStore.equals(blockStore)) {
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
} else {
blockStateChangedMsgBuilder.setLocation(executorId);
Expand Down Expand Up @@ -345,7 +347,8 @@ public void run() {
final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId);
if (optionalBlock.isPresent()) {
if (DataStoreProperty.Value.LocalFileStore.equals(blockStore)
|| DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
|| DataStoreProperty.Value.GlusterFileStore.equals(blockStore)
|| DataStoreProperty.Value.CrailFileStore.equals(blockStore)) {
final List<FileArea> fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange);
for (final FileArea fileArea : fileAreas) {
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
Expand Down Expand Up @@ -420,7 +423,6 @@ private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock(
numSerializedBytes += partition.getNumSerializedBytes();
numEncodedBytes += partition.getNumEncodedBytes();
}

return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
numEncodedBytes));
} catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
Expand Down Expand Up @@ -476,6 +478,8 @@ private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) {
case LocalFileStore:
return localFileStore;
case GlusterFileStore:
return localFileStore;
case CrailFileStore:
return remoteFileStore;
default:
throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported."));
Expand All @@ -499,6 +503,8 @@ private static ControlMessage.BlockStore convertBlockStore(
case LocalFileStore:
return ControlMessage.BlockStore.LOCAL_FILE;
case GlusterFileStore:
return ControlMessage.BlockStore.LOCAL_FILE; //since it is treated the same way as LOCAL_FILE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this.
When using GlusterFileStore, a file can be accessed from any executor, whereas when using LocalFileStore, a file can be accessed only from the executor that produced the file

case CrailFileStore:
return ControlMessage.BlockStore.REMOTE_FILE;
default:
throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported."));
Expand All @@ -522,7 +528,7 @@ private static DataStoreProperty.Value convertBlockStore(
case LOCAL_FILE:
return DataStoreProperty.Value.LocalFileStore;
case REMOTE_FILE:
return DataStoreProperty.Value.GlusterFileStore;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I got stuck here without a solution: Converting the ControlMessage REMOTE_FILE to either CrailFileStore or GlusterFileStore. For now, I just substituted GlusterFileStore to CrailFileStore, and let GlusterFileStore be converted to LocalFileStore, which absolutely isn't the right way :( Any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use two variables: GLUSTER_FILE and CRAIL_FILE

return DataStoreProperty.Value.CrailFileStore;
default:
throw new UnsupportedBlockStoreException(new Exception("This block store is not yet supported"));
}
Expand Down
Loading