Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/hdfs dev #270

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5fab841
Implement fetching data from HDFS to partitions
muthumala19 Aug 19, 2024
eb0a48a
Add copyrights and license
muthumala19 Aug 19, 2024
e2aea11
Implement HashPartitioner class
muthumala19 Sep 26, 2024
bef3e04
Create HashPartitioner with separation of concern
muthumala19 Sep 28, 2024
83eb758
Create HashPartitioner using thread per partition
muthumala19 Oct 3, 2024
dc0187f
Temp: DataPublisher communication implementation
muthumala19 Oct 15, 2024
de981d2
Temp: Implementation with DataPublisher objects
muthumala19 Nov 26, 2024
7f0659a
Create util method for master-worker communication
muthumala19 Dec 10, 2024
5074141
Add code to distribute data to the partitions
muthumala19 Dec 10, 2024
25daa9a
Change fetching,sending data from HDFS to workers
muthumala19 Dec 10, 2024
ea4bf80
Add validaton for HDFS dataset paths
muthumala19 Dec 10, 2024
b75d9fd
Modify counting edges for HDFS implementation
muthumala19 Dec 10, 2024
4212e46
Extend for handling undirected graphs
muthumala19 Dec 14, 2024
8150120
Add copyrights and license
muthumala19 Dec 21, 2024
1a770e7
Add updating uploadEndTime in metaDB
muthumala19 Dec 23, 2024
d7341b9
Add integration tests for HDFS implementation
muthumala19 Dec 27, 2024
1c21ae5
Merge branch 'miyurud:master' into feature/hdfs_dev
muthumala19 Dec 29, 2024
6679c8b
Add recent prerequisites tag: 20241231T070657
muthumala19 Dec 31, 2024
a8782c2
Fix cpplint issues
muthumala19 Dec 31, 2024
9833128
Fix cpplint issues in test.py
muthumala19 Dec 31, 2024
915529e
Fix cpplint issues in test.py (ii)
muthumala19 Dec 31, 2024
17deb1b
Fix invalid path to HDFS data in test-docker.sh
muthumala19 Jan 2, 2025
0ca9194
Merge branch 'master' into feature/hdfs_dev
muthumala19 Jan 2, 2025
273730f
Remove default HDFS server testing
muthumala19 Jan 2, 2025
93a08bc
Merge remote-tracking branch 'origin/feature/hdfs_dev' into feature/h…
muthumala19 Jan 2, 2025
de4cffa
Set to the most recent prerequisites tag
muthumala19 Jan 2, 2025
68e8885
Adjust for removal of default HDFS server testing
muthumala19 Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ set(HEADERS globals.h
src/streamingdb/StreamingSQLiteDBInterface.h
src/frontend/core/executor/impl/PageRankExecutor.h
src/util/dbinterface/DBInterface.h
src/util/hdfs/HDFSConnector.h
src/util/hdfs/HDFSStreamHandler.h
src/partitioner/stream/HashPartitioner.h
src/query/processor/cypher/util/Const.h
)

Expand Down Expand Up @@ -148,6 +151,9 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp
src/streamingdb/StreamingSQLiteDBInterface.cpp
src/frontend/core/executor/impl/PageRankExecutor.cpp
src/util/dbinterface/DBInterface.cpp
src/util/hdfs/HDFSConnector.cpp
src/util/hdfs/HDFSStreamHandler.cpp
src/partitioner/stream/HashPartitioner.cpp
src/query/processor/cypher/semanticanalyzer/SemanticAnalyzer.cpp
src/query/processor/cypher/util/Const.cpp
)
Expand Down Expand Up @@ -179,9 +185,14 @@ target_link_libraries(JasmineGraphLib PRIVATE /usr/lib/x86_64-linux-gnu/libxerce
target_link_libraries(JasmineGraphLib PRIVATE /usr/lib/x86_64-linux-gnu/libflatbuffers.a)
target_link_libraries(JasmineGraphLib PRIVATE /usr/lib/x86_64-linux-gnu/libjsoncpp.so)
target_link_libraries(JasmineGraphLib PRIVATE /usr/local/lib/libcppkafka.so)
target_link_libraries(JasmineGraphLib PRIVATE /usr/local/libhdfs3/lib/libhdfs3.so)
target_link_libraries(JasmineGraph JasmineGraphLib)
target_link_libraries(JasmineGraph curl)

include_directories(/usr/local/libhdfs3/include/hdfs/)
include_directories(/usr/include/oneapi/)
include_directories(/usr/local/parmetis/include/)
include_directories(/usr/lib/x86_64-linux-gnu/openmpi/include/)
include_directories(/usr/local/include/antlr4-runtime)
link_directories(/usr/local/lib)
include_directories(/usr/local/include/yaml-cpp)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM miyurud/jasminegraph-prerequisites:20241230T132919
FROM miyurud/jasminegraph-prerequisites:20241231T070657

RUN apt-get update && apt-get install -y libcurl4-openssl-dev sysstat nmon
RUN rm -r /usr/lib/python3.8/distutils
Expand Down
2 changes: 2 additions & 0 deletions conf/hdfs_config/consumer.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
hdfs.host=127.0.0.1
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this localhost IP?

hdfs.port=9000
3 changes: 3 additions & 0 deletions conf/jasminegraph-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ org.jasminegraph.server.nworkers=2
#org.jasminegraph.server.npartitions is the number of partitions into which the graph should be partitioned
org.jasminegraph.server.npartitions=2
org.jasminegraph.server.streaming.kafka.host=127.0.0.1:9092
org.jasminegraph.server.streaming.hdfs.host=hdfs://10.8.100.246
org.jasminegraph.server.streaming.hdfs.port=9000
org.jasminegraph.worker.path=/var/tmp/jasminegraph/
#Path to keep jasminegraph artifacts in order to copy them to remote locations. If this is not set then the artifacts in the
#JASMINEGRAPH_HOME location will be copied instead.
Expand All @@ -32,6 +34,7 @@ org.jasminegraph.artifact.path=
#The following folder is the location where workers keep their data.
#This is the location where the actual data storage takes place in JasmineGraph.
org.jasminegraph.server.instance.datafolder=/var/tmp/jasminegraph-localstore
org.jasminegraph.server.instance.tempdatafolder=/var/tmp/tempfiles
#The folder path for keeping central stores for triangle count aggregation
org.jasminegraph.server.instance.aggregatefolder=/var/tmp/jasminegraph-aggregate
org.jasminegraph.server.instance.trainedmodelfolder=/var/tmp/jasminegraph-localstore/jasminegraph-local_trained_model_store
Expand Down
196 changes: 193 additions & 3 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
#include "JasmineGraphFrontEndProtocol.h"
#include "core/CoreConstants.h"
#include "core/scheduler/JobScheduler.h"
#include "../util/hdfs/HDFSConnector.h"
#include "../util/hdfs/HDFSStreamHandler.h"
#include "antlr4-runtime.h"
#include "/home/ubuntu/software/antlr/CypherLexer.h"
#include "/home/ubuntu/software/antlr/CypherParser.h"
#include "../query/processor/cypher/astbuilder/ASTBuilder.h"
#include "../query/processor/cypher/astbuilder/ASTNode.h"
#include "../query/processor/cypher/semanticanalyzer/SemanticAnalyzer.h"


#define MAX_PENDING_CONNECTIONS 10
#define DATA_BUFFER_SIZE (FRONTEND_DATA_LENGTH + 1)

Expand Down Expand Up @@ -81,6 +82,9 @@
KafkaConnector *&kstream, thread &input_stream_handler_thread,
vector<DataPublisher *> &workerClients, int numberOfPartitions,
SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void addStreamHDFSCommand(std::string masterIP, int connFd, std::string &hdfsServerIp,
std::thread &inputStreamHandlerThread, int numberOfPartitions,
SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void stop_stream_kafka_command(int connFd, KafkaConnector *kstream, bool *loop_exit_p);
static void process_dataset_command(int connFd, bool *loop_exit_p);
static void triangles_command(std::string masterIP, int connFd, SQLiteDBInterface *sqlite,
Expand Down Expand Up @@ -109,8 +113,9 @@
for (int i = 0; i < workerList.size(); i++) {
Utils::worker currentWorker = workerList.at(i);
string workerHost = currentWorker.hostname;
int workerDataPort = std::stoi(currentWorker.dataPort);
int workerPort = atoi(string(currentWorker.port).c_str());
DataPublisher *workerClient = new DataPublisher(workerPort, workerHost);
DataPublisher *workerClient = new DataPublisher(workerPort, workerHost, workerDataPort);
workerClients.push_back(workerClient);
}
return workerClients;
Expand Down Expand Up @@ -144,6 +149,10 @@
KafkaConnector *kstream;
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite);

// Initiate HDFS parameters
std::string hdfsServerIp;

Check warning on line 153 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L153

Added line #L153 was not covered by tests
hdfsFS fileSystem;

vector<DataPublisher *> workerClients;
bool workerClientsInitialized = false;

Expand Down Expand Up @@ -198,6 +207,9 @@
}
add_stream_kafka_command(connFd, kafka_server_IP, configs, kstream, input_stream_handler, workerClients,
numberOfPartitions, sqlite, &loop_exit);
} else if (line.compare(ADD_STREAM_HDFS) == 0) {
addStreamHDFSCommand(masterIP, connFd, hdfsServerIp, input_stream_handler, numberOfPartitions,
sqlite, &loop_exit);
} else if (line.compare(STOP_STREAM_KAFKA) == 0) {
stop_stream_kafka_command(connFd, kstream, &loop_exit);
} else if (line.compare(RMGR) == 0) {
Expand Down Expand Up @@ -1304,8 +1316,186 @@
input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler);
}

void addStreamHDFSCommand(std::string masterIP, int connFd, std::string &hdfsServerIp,

Check warning on line 1319 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1319

Added line #L1319 was not covered by tests
std::thread &inputStreamHandlerThread, int numberOfPartitions,
SQLiteDBInterface *sqlite, bool *loop_exit_p) {
std::string hdfsPort;

Check warning on line 1322 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1322

Added line #L1322 was not covered by tests
std::string msg1 = "Do you want to use the default HDFS server(y/n)?";
int resultWr = write(connFd, msg1.c_str(), msg1.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1328 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1327-L1328

Added lines #L1327 - L1328 were not covered by tests
}
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1334 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1333-L1334

Added lines #L1333 - L1334 were not covered by tests
}

char userRes[FRONTEND_DATA_LENGTH + 1];
bzero(userRes, FRONTEND_DATA_LENGTH + 1);

Check warning on line 1338 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1338

Added line #L1338 was not covered by tests
read(connFd, userRes, FRONTEND_DATA_LENGTH);
std::string userResS(userRes);
userResS = Utils::trim_copy(userResS);
for (char &c : userResS) {
c = tolower(c);

Check warning on line 1343 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1343

Added line #L1343 was not covered by tests
}

if (userResS == "y") {
hdfsServerIp = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.hdfs.host");
hdfsPort = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.hdfs.port");
} else {
std::string message = "Send the file path to the HDFS configuration file.";
resultWr = write(connFd, message.c_str(), message.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1355 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1354-L1355

Added lines #L1354 - L1355 were not covered by tests
}
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1361 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1360-L1361

Added lines #L1360 - L1361 were not covered by tests
}

char filePath[FRONTEND_DATA_LENGTH + 1];
bzero(filePath, FRONTEND_DATA_LENGTH + 1);

Check warning on line 1365 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1365

Added line #L1365 was not covered by tests
read(connFd, filePath, FRONTEND_DATA_LENGTH);
std::string filePathS(filePath);
filePathS = Utils::trim_copy(filePathS);

frontend_logger.info("Reading HDFS configuration file: " + filePathS);

std::vector<std::string> vec = Utils::getFileContent(filePathS);
for (const auto &item : vec) {
if (item.length() > 0 && !(item.rfind("#", 0) == 0)) {
std::vector<std::string> vec2 = Utils::split(item, '=');
if (vec2.size() == 2) {
if (vec2.at(0).compare("hdfs.host") == 0) {
hdfsServerIp = vec2.at(1);
} else if (vec2.at(0).compare("hdfs.port") == 0) {
hdfsPort = vec2.at(1);
}
} else {
frontend_logger.error("Invalid line in configuration file: " + item);
}
}

Check warning on line 1385 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1385

Added line #L1385 was not covered by tests
}
}

if (hdfsServerIp.empty()) {
frontend_logger.error("HDFS server IP is not set or empty.");
}
if (hdfsPort.empty()) {
frontend_logger.error("HDFS server port is not set or empty.");
}

std::string message = "HDFS file path: ";
resultWr = write(connFd, message.c_str(), message.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1401 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1400-L1401

Added lines #L1400 - L1401 were not covered by tests
}
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1407 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1406-L1407

Added lines #L1406 - L1407 were not covered by tests
}

char hdfsFilePath[FRONTEND_DATA_LENGTH + 1];
bzero(hdfsFilePath, FRONTEND_DATA_LENGTH + 1);

Check warning on line 1411 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1411

Added line #L1411 was not covered by tests
read(connFd, hdfsFilePath, FRONTEND_DATA_LENGTH);
std::string hdfsFilePathS(hdfsFilePath);
hdfsFilePathS = Utils::trim_copy(hdfsFilePathS);

HDFSConnector *hdfsConnector = new HDFSConnector(hdfsServerIp, hdfsPort);

if (!hdfsConnector->isPathValid(hdfsFilePathS)) {
frontend_logger.error("Invalid HDFS file path: " + hdfsFilePathS);
std::string error_message = "The provided HDFS path is invalid.";
write(connFd, error_message.c_str(), error_message.length());
write(connFd, "\r\n", 2);
delete hdfsConnector;
*loop_exit_p = true;
return;
}

Check warning on line 1426 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1424-L1426

Added lines #L1424 - L1426 were not covered by tests

/*get directionality*/
std::string isDirectedGraph = "Is this a directed graph(y/n)?";
resultWr = write(connFd, isDirectedGraph.c_str(), isDirectedGraph.length());
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1434 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1433-L1434

Added lines #L1433 - L1434 were not covered by tests
}
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1440 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1439-L1440

Added lines #L1439 - L1440 were not covered by tests
}

char isDirectedRes[FRONTEND_DATA_LENGTH + 1];
bzero(isDirectedRes, FRONTEND_DATA_LENGTH + 1);

Check warning on line 1444 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1444

Added line #L1444 was not covered by tests
read(connFd, isDirectedRes, FRONTEND_DATA_LENGTH);
std::string isDirectedS(isDirectedRes);
isDirectedS = Utils::trim_copy(isDirectedS);

bool directed;
if (isDirectedS == "y") {
directed = true;

Check warning on line 1451 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1451

Added line #L1451 was not covered by tests
} else {
directed = false;

Check warning on line 1453 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1453

Added line #L1453 was not covered by tests
}

std::string path = "hdfs:" + hdfsFilePathS;

std::time_t time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());

Check warning on line 1458 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1458

Added line #L1458 was not covered by tests
std::string uploadStartTime = ctime(&time);
std::string sqlStatement =
"INSERT INTO graph (name, upload_path, upload_start_time, upload_end_time, graph_status_idgraph_status, "
"vertexcount, centralpartitioncount, edgecount, is_directed) VALUES(\"" +
hdfsFilePathS + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\", \"" +
std::to_string(Conts::GRAPH_STATUS::NONOPERATIONAL) + "\", \"\", \"\", \"\", \"" +
(directed ? "TRUE" : "FALSE") + "\")";

int newGraphID = sqlite->runInsert(sqlStatement);
frontend_logger.info("Created graph ID: " + std::to_string(newGraphID));
HDFSStreamHandler *streamHandler = new HDFSStreamHandler(hdfsConnector->getFileSystem(), hdfsFilePathS,
numberOfPartitions, newGraphID, sqlite, masterIP);
frontend_logger.info("Started listening to " + hdfsFilePathS);
inputStreamHandlerThread = std::thread(&HDFSStreamHandler::startStreamingFromBufferToPartitions, streamHandler);
inputStreamHandlerThread.join();

std::string uploadEndTime = ctime(&time);
std::string sqlStatementUpdateEndTime =
"UPDATE graph "
"SET upload_end_time = \"" + uploadEndTime + "\" "
"WHERE idgraph = " + std::to_string(newGraphID);
sqlite->runInsert(sqlStatementUpdateEndTime);


int conResultWr = write(connFd, DONE.c_str(), DONE.length());
if (conResultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1487 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1486-L1487

Added lines #L1486 - L1487 were not covered by tests
}
resultWr = write(connFd, "\r\n", 2);
if (resultWr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1493 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1492-L1493

Added lines #L1492 - L1493 were not covered by tests
}
}

static void stop_stream_kafka_command(int connFd, KafkaConnector *kstream, bool *loop_exit_p) {
frontend_logger.info("Start serving `" + STOP_STREAM_KAFKA + "` command");
frontend_logger.info("Started serving `" + STOP_STREAM_KAFKA + "` command");
// Unsubscribe the kafka consumer.
kstream->Unsubscribe();
string message = "Successfully stop `" + stream_topic_name + "` input kafka stream";
Expand Down
1 change: 1 addition & 0 deletions src/frontend/JasmineGraphFrontEndProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const string SHTDN = "shdn";
const string SEND = "send";
const string ERROR = "error";
const string ADD_STREAM_KAFKA = "adstrmk";
const string ADD_STREAM_HDFS = "adhdfs";
const string ADD_STREAM_KAFKA_CSV = "adstrmkcsv";
const string STOP_STREAM_KAFKA = "stopstrm";
const string STOP_STRIAN = "stopstrian";
Expand Down
1 change: 1 addition & 0 deletions src/frontend/JasmineGraphFrontEndProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ extern const string OUTPUT_FILE_NAME;
extern const string OUTPUT_FILE_PATH;
extern const string ADD_STREAM;
extern const string ADD_STREAM_KAFKA;
extern const string ADD_STREAM_HDFS;
extern const string STRM_ACK;
extern const string ADD_STREAM_KAFKA;
extern const string STREAM_TOPIC_NAME;
Expand Down
14 changes: 14 additions & 0 deletions src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,17 @@
// TODO tmkasun: handle JSON errors
}
}

void JasmineGraphIncrementalLocalStore::addLocalEdge(const std::pair<std::string, std::string> &edge) {

Check warning on line 110 in src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp

View check run for this annotation

Codecov / codecov/patch

src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp#L110

Added line #L110 was not covered by tests
RelationBlock* newRelation;
newRelation = this->nm->addLocalEdge({edge.first, edge.second});
// [ToDo]:implement add edge properties
incremental_localstore_logger.debug("Local edge ("+edge.first+"-> "+edge.second+" ) added successfully");
}

Check warning on line 115 in src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp

View check run for this annotation

Codecov / codecov/patch

src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp#L115

Added line #L115 was not covered by tests

void JasmineGraphIncrementalLocalStore::addCentralEdge(const std::pair<std::string, std::string> &edge) {

Check warning on line 117 in src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp

View check run for this annotation

Codecov / codecov/patch

src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp#L117

Added line #L117 was not covered by tests
RelationBlock* newRelation;
newRelation = this->nm->addCentralEdge({edge.first, edge.second});
// [ToDo]:implement add edge properties
incremental_localstore_logger.debug("Central edge ("+edge.first+"-> "+edge.second+" ) added successfully");
}

Check warning on line 122 in src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp

View check run for this annotation

Codecov / codecov/patch

src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp#L122

Added line #L122 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class JasmineGraphIncrementalLocalStore {
static std::pair<std::string, unsigned int> getIDs(std::string edgeString);
JasmineGraphIncrementalLocalStore(unsigned int graphID = 0,
unsigned int partitionID = 0, std::string openMode = "trunk");
void addLocalEdge(const std::pair<std::string, std::string> &edge);
void addCentralEdge(const std::pair<std::string, std::string> &edge);
};

#endif
4 changes: 3 additions & 1 deletion src/nativestore/DataPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

Logger data_publisher_logger;

DataPublisher::DataPublisher(int worker_port, std::string worker_address) {
DataPublisher::DataPublisher(int worker_port, std::string worker_address, int worker_data_port) {

Check warning on line 24 in src/nativestore/DataPublisher.cpp

View check run for this annotation

Codecov / codecov/patch

src/nativestore/DataPublisher.cpp#L24

Added line #L24 was not covered by tests
this->worker_port = worker_port;
this->worker_address = worker_address;
this->data_port = worker_data_port;

Check warning on line 27 in src/nativestore/DataPublisher.cpp

View check run for this annotation

Codecov / codecov/patch

src/nativestore/DataPublisher.cpp#L27

Added line #L27 was not covered by tests
struct hostent *server;

server = gethostbyname(worker_address.c_str());
Expand All @@ -42,6 +43,7 @@
if (Utils::connect_wrapper(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
data_publisher_logger.error("Connection Failed!");
}
data_publisher_logger.info("socket created");
}

DataPublisher::~DataPublisher() {
Expand Down
3 changes: 2 additions & 1 deletion src/nativestore/DataPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ class DataPublisher {
struct sockaddr_in serv_addr;
std::string worker_address, message;
char buffer[1024] = {0};
int data_port;

public:
DataPublisher(int, std::string);
DataPublisher(int, std::string, int);
void publish(std::string);
void publish_relation(std::string);
void publish_edge(std::string);
Expand Down
Loading
Loading