Skip to content

Commit

Permalink
Merge pull request #254 from ChandiH/feature/frontend
Browse files Browse the repository at this point in the history
JasmineGraph Frontend Shell
  • Loading branch information
miyurud authored Jan 14, 2025
2 parents 5538e94 + e8d85e7 commit 6bc8c99
Show file tree
Hide file tree
Showing 20 changed files with 1,104 additions and 295 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp
src/util/dbinterface/DBInterface.cpp
src/query/processor/cypher/semanticanalyzer/SemanticAnalyzer.cpp
src/query/processor/cypher/util/Const.cpp
src/frontend/ui/JasmineGraphFrontEndUIProtocol.h
src/frontend/ui/JasmineGraphFrontEndUIProtocol.cpp
src/frontend/ui/JasmineGraphFrontEndUI.cpp
src/frontend/ui/JasmineGraphFrontEndUI.h
src/frontend/core/common/JasmineGraphFrontendCommon.cpp
src/frontend/core/common/JasmineGraphFrontendCommon.h
)

if (CMAKE_BUILD_TYPE STREQUAL "DEBUG")
Expand Down
2 changes: 1 addition & 1 deletion cmake_modules/CodeCoverage.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ function(setup_target_for_coverage_gcovr_xml)
# Running gcovr
set(GCOVR_XML_CMD
${GCOVR_PATH} --xml ${Coverage_NAME}.xml -r ${BASEDIR} ${GCOVR_ADDITIONAL_ARGS}
${GCOVR_EXCLUDE_ARGS} --object-directory=${PROJECT_BINARY_DIR}
${GCOVR_EXCLUDE_ARGS} --object-directory=${PROJECT_BINARY_DIR} --gcov-ignore-parse-errors=negative_hits.warn
)

if(CODE_COVERAGE_VERBOSE)
Expand Down
295 changes: 36 additions & 259 deletions src/frontend/JasmineGraphFrontEnd.cpp

Large diffs are not rendered by default.

24 changes: 1 addition & 23 deletions src/frontend/JasmineGraphFrontEnd.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,12 @@ class JasmineGraphFrontEnd {

int run();

static bool graphExists(std::string basic_string, SQLiteDBInterface *sqlite);

static bool modelExists(std::string basic_string, SQLiteDBInterface *sqlite);

static bool graphExistsByID(std::string id, SQLiteDBInterface *sqlite);

static bool modelExistsByID(std::string id, SQLiteDBInterface *sqlite);

static void removeGraph(std::string graphID, SQLiteDBInterface *sqlite, std::string masterIP);

static void getAndUpdateUploadTime(std::string graphID, SQLiteDBInterface *sqlite);

static bool isGraphActiveAndTrained(std::string graphID, SQLiteDBInterface *sqlite);

static map<long, long> getOutDegreeDistributionHashMap(map<long, unordered_set<long>> graphMap);

static bool isGraphActive(string graphID, SQLiteDBInterface *sqlite);

static int getUid();

static long getSLAForGraphId(SQLiteDBInterface *sqlite, PerformanceSQLiteDBInterface *perfSqlite,
std::string graphId, std::string command, std::string category);

static void scheduleStrianJobs(JobRequest &jobDetails, std::priority_queue<JobRequest> &jobQueue,
JobScheduler *jobScheduler, bool *strian_exist);

static int getRunningHighPriorityTaskCount();
static bool areRunningJobsForSameGraph();

static bool strian_exit;
std::map<std::string, std::atomic<bool>> *streamsState;
std::map<std::string, std::thread> streamingThreads;
Expand Down
1 change: 0 additions & 1 deletion src/frontend/JasmineGraphFrontEndProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,3 @@ const string COMMAND = "command";
const string PRIORITY = "priority(>=1)";
const string INVALID_FORMAT = "Invalid message format";
const string CYPHER_AST = "cypher-ast";

230 changes: 230 additions & 0 deletions src/frontend/core/common/JasmineGraphFrontendCommon.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/**
Copyright 2019 JasminGraph Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "JasmineGraphFrontendCommon.h"
#include "../../JasmineGraphFrontEndProtocol.h"
#include "../../../server/JasmineGraphServer.h"
#include "../../../util/logger/Logger.h"

Logger common_logger;

/**
* This method checks if a graph exists in JasmineGraph.
* This method uses the unique path of the graph.
* @param basic_string
* @param dummyPt
* @return
*/
bool JasmineGraphFrontEndCommon::graphExists(string path, SQLiteDBInterface *sqlite) {
string stmt = "SELECT COUNT( * ) FROM graph WHERE upload_path LIKE '" + path +
"' AND graph_status_idgraph_status = '" + to_string(Conts::GRAPH_STATUS::OPERATIONAL) + "';";
std::vector<vector<pair<string, string>>> v = sqlite->runSelect(stmt);
return (std::stoi(v[0][0].second) != 0);
}

/**
* This method checks if an accessible graph exists in JasmineGraph with the same unique ID.
* @param id
* @param dummyPt
* @return
*/
bool JasmineGraphFrontEndCommon::graphExistsByID(string id, SQLiteDBInterface *sqlite) {
string stmt = "SELECT COUNT( * ) FROM graph WHERE idgraph = " + id;
std::vector<vector<pair<string, string>>> v = sqlite->runSelect(stmt);
return (std::stoi(v[0][0].second) != 0);
}

/**
* This method removes a graph from JasmineGraph
*/
void JasmineGraphFrontEndCommon::removeGraph(std::string graphID, SQLiteDBInterface *sqlite, std::string masterIP) {
vector<pair<string, string>> hostHasPartition;
vector<vector<pair<string, string>>> hostPartitionResults = sqlite->runSelect(
"SELECT name, partition_idpartition FROM worker_has_partition INNER JOIN worker ON "
"worker_has_partition.worker_idworker = worker.idworker WHERE partition_graph_idgraph = " +
graphID + ";");
for (vector<vector<pair<string, string>>>::iterator i = hostPartitionResults.begin();
i != hostPartitionResults.end(); ++i) {
int count = 0;
string hostname;
string partitionID;
for (std::vector<pair<string, string>>::iterator j = (i->begin()); j != i->end(); ++j) {
if (count == 0) {
hostname = j->second;
} else {
partitionID = j->second;
hostHasPartition.push_back(pair<string, string>(hostname, partitionID));
}
count++;
}
}
for (std::vector<pair<string, string>>::iterator j = (hostHasPartition.begin()); j != hostHasPartition.end(); ++j) {
common_logger.info("HOST ID : " + j->second + " PARTITION ID : " + j->first);
}
sqlite->runUpdate("UPDATE graph SET graph_status_idgraph_status = " + to_string(Conts::GRAPH_STATUS::DELETING) +
" WHERE idgraph = " + graphID);

JasmineGraphServer::removeGraph(hostHasPartition, graphID, masterIP);

sqlite->runUpdate("DELETE FROM worker_has_partition WHERE partition_graph_idgraph = " + graphID);
sqlite->runUpdate("DELETE FROM partition WHERE graph_idgraph = " + graphID);
sqlite->runUpdate("DELETE FROM graph WHERE idgraph = " + graphID);
}

/**
* This method checks whether the graph is active and trained
* @param graphID
* @param dummyPt
* @return
*/
bool JasmineGraphFrontEndCommon::isGraphActiveAndTrained(std::string graphID, SQLiteDBInterface *sqlite) {
string stmt = "SELECT COUNT( * ) FROM graph WHERE idgraph LIKE '" + graphID +
"' AND graph_status_idgraph_status = '" + to_string(Conts::GRAPH_STATUS::OPERATIONAL) +
"' AND train_status = '" + (Conts::TRAIN_STATUS::TRAINED) + "';";
std::vector<vector<pair<string, string>>> v = sqlite->runSelect(stmt);
return (std::stoi(v[0][0].second) != 0);
}

/**
* This method checks whether the graph is active
* @param graphID
* @param dummyPt
* @return
*/
bool JasmineGraphFrontEndCommon::isGraphActive(std::string graphID, SQLiteDBInterface *sqlite) {
string stmt = "SELECT COUNT( * ) FROM graph WHERE idgraph LIKE '" + graphID +
"' AND graph_status_idgraph_status = '" + to_string(Conts::GRAPH_STATUS::OPERATIONAL) + "';";
std::vector<vector<pair<string, string>>> v = sqlite->runSelect(stmt);
return (std::stoi(v[0][0].second) != 0);
}

bool JasmineGraphFrontEndCommon::modelExistsByID(string id, SQLiteDBInterface *sqlite) {
string stmt = "SELECT COUNT( * ) FROM model WHERE idmodel = " + id +
" and model_status_idmodel_status = " + to_string(Conts::GRAPH_STATUS::OPERATIONAL);
std::vector<vector<pair<string, string>>> v = sqlite->runSelect(stmt);
return (std::stoi(v[0][0].second) != 0);
}

bool JasmineGraphFrontEndCommon::modelExists(string path, SQLiteDBInterface *sqlite) {
string stmt = "SELECT COUNT( * ) FROM model WHERE upload_path LIKE '" + path +
"' AND model_status_idmodel_status = '" + to_string(Conts::GRAPH_STATUS::OPERATIONAL) + "';";
std::vector<vector<pair<string, string>>> v = sqlite->runSelect(stmt);
return (std::stoi(v[0][0].second) != 0);
}

void JasmineGraphFrontEndCommon::getAndUpdateUploadTime(std::string graphID, SQLiteDBInterface *sqlite) {
struct tm tm;
vector<vector<pair<string, string>>> uploadStartFinishTimes =
sqlite->runSelect("SELECT upload_start_time,upload_end_time FROM graph WHERE idgraph = '" + graphID + "'");
string startTime = uploadStartFinishTimes[0][0].second;
string endTime = uploadStartFinishTimes[0][1].second;
string sTime = startTime.substr(startTime.size() - 14, startTime.size() - 5);
string eTime = endTime.substr(startTime.size() - 14, startTime.size() - 5);
strptime(sTime.c_str(), "%H:%M:%S", &tm);
time_t start = mktime(&tm);
strptime(eTime.c_str(), "%H:%M:%S", &tm);
time_t end = mktime(&tm);
double difTime = difftime(end, start);
sqlite->runUpdate("UPDATE graph SET upload_time = " + to_string(difTime) + " WHERE idgraph = " + graphID);
common_logger.info("Upload time updated in the database");
}

map<long, long> JasmineGraphFrontEndCommon::getOutDegreeDistributionHashMap(map<long, unordered_set<long>> graphMap) {
map<long, long> distributionHashMap;

for (map<long, unordered_set<long>>::iterator it = graphMap.begin(); it != graphMap.end(); ++it) {
long distribution = (it->second).size();
distributionHashMap[it->first] = distribution;
}
return distributionHashMap;
}

int JasmineGraphFrontEndCommon::getUid() {
static std::atomic<std::uint32_t> uid{0};
return ++uid;
}

long JasmineGraphFrontEndCommon::getSLAForGraphId(SQLiteDBInterface *sqlite, PerformanceSQLiteDBInterface *perfSqlite,
std::string graphId, std::string command, std::string category) {
long graphSLAValue = 0;

string sqlStatement =
"SELECT worker_idworker, name,ip,user,server_port,server_data_port,partition_idpartition "
"FROM worker_has_partition INNER JOIN worker ON worker_has_partition.worker_idworker=worker.idworker "
"WHERE partition_graph_idgraph=" +
graphId + ";";

std::vector<vector<pair<string, string>>> results = sqlite->runSelect(sqlStatement);
int partitionCount = results.size();

string graphSlaQuery =
"select graph_sla.sla_value from graph_sla,sla_category where graph_sla.id_sla_category=sla_category.id "
"and sla_category.command='" +
command + "' and sla_category.category='" + category +
"' and "
"graph_sla.graph_id='" +
graphId + "' and graph_sla.partition_count='" + std::to_string(partitionCount) + "';";

std::vector<vector<pair<string, string>>> slaResults = perfSqlite->runSelect(graphSlaQuery);

if (slaResults.size() > 0) {
string currentSlaString = slaResults[0][0].second;
long graphSLAValue = atol(currentSlaString.c_str());
}

return graphSLAValue;
}

std::vector<std::vector<std::pair<std::string, std::string>>>
JasmineGraphFrontEndCommon::getGraphData(SQLiteDBInterface *sqlite) {
return sqlite->runSelect("SELECT idgraph, name, upload_path, graph_status_idgraph_status, "
"vertexcount, edgecount, centralpartitioncount FROM graph;");
}

bool JasmineGraphFrontEndCommon::checkServerBusy(std::atomic<int> *currentFESession, int connFd) {
if (*currentFESession >= Conts::MAX_FE_SESSIONS) {
if (!Utils::send_str_wrapper(connFd, "JasmineGraph server is busy. Please try again later.")) {
common_logger.error("Error writing to socket");
}
close(connFd);
return true;
}
(*currentFESession)++; // Increment only if not busy
return false;
}

std::string JasmineGraphFrontEndCommon::readAndProcessInput(int connFd, char* data, int &failCnt) {
std::string line = Utils::read_str_wrapper(connFd, data, FRONTEND_DATA_LENGTH, true);
if (line.empty()) {
failCnt++;
if (failCnt > 4) {
return "";
}
sleep(1);
} else {
failCnt = 0;
}
return Utils::trim_copy(line);
}

std::string JasmineGraphFrontEndCommon::getPartitionCount(std::string path) {
if (Utils::getJasmineGraphProperty("org.jasminegraph.autopartition.enabled") != "true") {
return "";
}
ifstream dataFile(path);
size_t edges = std::count(std::istreambuf_iterator<char>(dataFile), std::istreambuf_iterator<char>(), '\n');
dataFile.close();
int partCnt = (int)round(pow(edges, 0.2) / 6);
if (partCnt < 2) partCnt = 2;
return to_string(partCnt);
}
57 changes: 57 additions & 0 deletions src/frontend/core/common/JasmineGraphFrontendCommon.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
Copyright 2019 JasminGraph Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef JASMINEGRAPHFRONTENDCOMMON_H
#define JASMINEGRAPHFRONTENDCOMMON_H

#include <map>
#include <thread>

#include "../../../metadb/SQLiteDBInterface.h"
#include "../../../query/algorithms/triangles/Triangles.h"

class JasmineGraphFrontEndCommon {
public:
static bool graphExists(std::string basic_string, SQLiteDBInterface *sqlite);

static bool graphExistsByID(std::string id, SQLiteDBInterface *sqlite);

static void removeGraph(std::string graphID, SQLiteDBInterface *sqlite, std::string masterIP);

static bool isGraphActive(string graphID, SQLiteDBInterface *sqlite);

static bool modelExists(std::string basic_string, SQLiteDBInterface *sqlite);

static bool modelExistsByID(std::string id, SQLiteDBInterface *sqlite);

static void getAndUpdateUploadTime(std::string graphID, SQLiteDBInterface *sqlite);

static bool isGraphActiveAndTrained(std::string graphID, SQLiteDBInterface *sqlite);

static map<long, long> getOutDegreeDistributionHashMap(map<long, unordered_set<long>> graphMap);

static int getUid();

static long getSLAForGraphId(SQLiteDBInterface *sqlite, PerformanceSQLiteDBInterface *perfSqlite,
std::string graphId, std::string command, std::string category);

static std::vector<std::vector<std::pair<std::string, std::string>>> getGraphData(SQLiteDBInterface *sqlite);

static bool checkServerBusy(std::atomic<int> *currentFESession, int connFd);

static std::string readAndProcessInput(int connFd, char* data, int &failCnt);

static std::string getPartitionCount(std::string path);
};

#endif // JASMINEGRAPHFRONTENDCOMMON_H
Loading

0 comments on commit 6bc8c99

Please sign in to comment.