Skip to content

Commit 27b78f2

Browse files
authored
feat(interactive): Enable starting compiler as a subprocess of interactive server (alibaba#3650)
As our `AdminService` is able to switching services between graphs, we should be also able to switch graph on compiler service. For now, we make `GraphServer` a subprocess of `InteractiveServer`.
1 parent 577a231 commit 27b78f2

File tree

8 files changed

+235
-17
lines changed

8 files changed

+235
-17
lines changed

docs/flex/interactive/development/admin_service.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,4 +1041,18 @@ curl -X GET -H "Content-Type: application/json" "http://[host]/v1/node/status"
10411041

10421042
#### Status Codes
10431043
- `200 OK`: Request successful.
1044-
- `500 Internal Error`: Server internal Error.
1044+
- `500 Internal Error`: Server internal Error.
1045+
1046+
## Enable AdminService in development
1047+
1048+
To start admin service in development, use the command line argument `--enable-admin-service true`. `${ENGINE_CONFIG}` specifies the configuration for interactive query engine, see [engine-configuration](https://graphscope.io/docs/flex/interactive/configuration). `${WORKSPACE}` points to the directory where interactive related data is maintaned.
1049+
1050+
```bash
1051+
./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true
1052+
```
1053+
1054+
### Start Compiler Service
1055+
The Compiler service could be started as a subprocess of the AdminService. This ensures that when switching graphs in the AdminService, the Compiler service also switches to the corresponding graph's schema. This is the default behavior in the current Interactive.
1056+
1057+
```bash
1058+
./bin/interactive_server -c ${ENGINE_CONFIG} -w ${WORKSPACE} --enable-admin-service true --start-compiler true

flex/bin/interactive_server.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ int main(int argc, char** argv) {
149149
bpo::value<unsigned>()->default_value(2),
150150
"worker thread number")(
151151
"enable-trace", bpo::value<bool>()->default_value(false),
152-
"whether to enable opentelemetry tracing");
152+
"whether to enable opentelemetry tracing")(
153+
"start-compiler", bpo::value<bool>()->default_value(false),
154+
"whether or not to start compiler");
153155

154156
setenv("TZ", "Asia/Shanghai", 1);
155157
tzset();
@@ -179,6 +181,7 @@ int main(int argc, char** argv) {
179181
server::ServiceConfig service_config = node.as<server::ServiceConfig>();
180182
service_config.engine_config_path = engine_config_file;
181183
service_config.start_admin_service = vm["enable-admin-service"].as<bool>();
184+
service_config.start_compiler = vm["start-compiler"].as<bool>();
182185

183186
auto& db = gs::GraphDB::get();
184187

flex/engines/http_server/handler/hqps_http_handler.cc

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id,
6363
max_group_id_(max_group_id),
6464
group_inc_step_(group_inc_step),
6565
shard_concurrency_(shard_concurrency),
66-
executor_idx_(0) {
66+
executor_idx_(0),
67+
is_cancelled_(false) {
6768
executor_refs_.reserve(shard_concurrency_);
6869
hiactor::scope_builder builder;
6970
builder.set_shard(hiactor::local_shard_id())
@@ -87,10 +88,15 @@ seastar::future<> hqps_ic_handler::cancel_current_scope() {
8788
LOG(INFO) << "Cancel IC scope successfully!";
8889
// clear the actor refs
8990
executor_refs_.clear();
91+
is_cancelled_ = true;
9092
return seastar::make_ready_future<>();
9193
});
9294
}
9395

96+
bool hqps_ic_handler::is_current_scope_cancelled() const {
97+
return is_cancelled_;
98+
}
99+
94100
bool hqps_ic_handler::create_actors() {
95101
if (executor_refs_.size() > 0) {
96102
LOG(ERROR) << "The actors have been already created!";
@@ -114,6 +120,7 @@ bool hqps_ic_handler::create_actors() {
114120
for (unsigned i = 0; i < shard_concurrency_; ++i) {
115121
executor_refs_.emplace_back(builder.build_ref<executor_ref>(i));
116122
}
123+
is_cancelled_ = false; // locked outside
117124
return true;
118125
}
119126

@@ -166,7 +173,8 @@ hqps_adhoc_query_handler::hqps_adhoc_query_handler(
166173
max_group_id_(max_group_id),
167174
group_inc_step_(group_inc_step),
168175
shard_concurrency_(shard_concurrency),
169-
executor_idx_(0) {
176+
executor_idx_(0),
177+
is_cancelled_(false) {
170178
executor_refs_.reserve(shard_concurrency_);
171179
{
172180
hiactor::scope_builder builder;
@@ -213,10 +221,15 @@ seastar::future<> hqps_adhoc_query_handler::cancel_current_scope() {
213221
executor_refs_.clear();
214222
codegen_actor_refs_.clear();
215223
LOG(INFO) << "Clear actor refs successfully!";
224+
is_cancelled_ = true;
216225
return seastar::make_ready_future<>();
217226
});
218227
}
219228

229+
bool hqps_adhoc_query_handler::is_current_scope_cancelled() const {
230+
return is_cancelled_;
231+
}
232+
220233
bool hqps_adhoc_query_handler::create_actors() {
221234
if (executor_refs_.size() > 0 || codegen_actor_refs_.size() > 0) {
222235
LOG(ERROR) << "The actors have been already created!";
@@ -255,6 +268,7 @@ bool hqps_adhoc_query_handler::create_actors() {
255268
hiactor::scope<hiactor::actor_group>(cur_codegen_group_id_));
256269
codegen_actor_refs_.emplace_back(builder.build_ref<codegen_actor_ref>(0));
257270
}
271+
is_cancelled_ = false;
258272
return true;
259273
}
260274

@@ -378,6 +392,11 @@ uint16_t hqps_http_handler::get_port() const { return http_port_; }
378392

379393
bool hqps_http_handler::is_running() const { return running_.load(); }
380394

395+
bool hqps_http_handler::is_actors_running() const {
396+
return !ic_handler_->is_current_scope_cancelled() &&
397+
!adhoc_query_handler_->is_current_scope_cancelled();
398+
}
399+
381400
void hqps_http_handler::start() {
382401
auto fut = seastar::alien::submit_to(
383402
*seastar::alien::internal::default_instance, 0, [this] {
@@ -409,15 +428,30 @@ void hqps_http_handler::stop() {
409428

410429
seastar::future<> hqps_http_handler::stop_query_actors() {
411430
// First cancel the scope.
412-
return ic_handler_->cancel_current_scope()
413-
.then([this] {
414-
LOG(INFO) << "Cancel ic scope";
415-
return adhoc_query_handler_->cancel_current_scope();
416-
})
417-
.then([] {
418-
LOG(INFO) << "Cancel adhoc scope";
419-
return seastar::make_ready_future<>();
420-
});
431+
if (ic_handler_->is_current_scope_cancelled()) {
432+
LOG(INFO) << "IC scope has been cancelled!";
433+
if (adhoc_query_handler_->is_current_scope_cancelled()) {
434+
LOG(INFO) << "Adhoc scope has been cancelled!";
435+
return seastar::make_ready_future<>();
436+
} else {
437+
return adhoc_query_handler_->cancel_current_scope();
438+
}
439+
} else {
440+
return ic_handler_->cancel_current_scope()
441+
.then([this] {
442+
LOG(INFO) << "Cancel ic scope";
443+
if (adhoc_query_handler_->is_current_scope_cancelled()) {
444+
LOG(INFO) << "Adhoc scope has been cancelled!";
445+
return seastar::make_ready_future<>();
446+
} else {
447+
return adhoc_query_handler_->cancel_current_scope();
448+
}
449+
})
450+
.then([] {
451+
LOG(INFO) << "Cancel adhoc scope";
452+
return seastar::make_ready_future<>();
453+
});
454+
}
421455
}
422456

423457
void hqps_http_handler::start_query_actors() {

flex/engines/http_server/handler/hqps_http_handler.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class hqps_ic_handler : public seastar::httpd::handler_base {
3434

3535
seastar::future<> cancel_current_scope();
3636

37+
bool is_current_scope_cancelled() const;
38+
3739
seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
3840
const seastar::sstring& path,
3941
std::unique_ptr<seastar::httpd::request> req,
@@ -45,6 +47,7 @@ class hqps_ic_handler : public seastar::httpd::handler_base {
4547
const uint32_t shard_concurrency_;
4648
uint32_t executor_idx_;
4749
std::vector<executor_ref> executor_refs_;
50+
bool is_cancelled_;
4851
};
4952

5053
class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
@@ -58,6 +61,8 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
5861

5962
seastar::future<> cancel_current_scope();
6063

64+
bool is_current_scope_cancelled() const;
65+
6166
bool create_actors();
6267

6368
seastar::future<std::unique_ptr<seastar::httpd::reply>> handle(
@@ -72,6 +77,7 @@ class hqps_adhoc_query_handler : public seastar::httpd::handler_base {
7277
uint32_t executor_idx_;
7378
std::vector<executor_ref> executor_refs_;
7479
std::vector<codegen_actor_ref> codegen_actor_refs_;
80+
bool is_cancelled_;
7581
};
7682

7783
class hqps_exit_handler : public seastar::httpd::handler_base {
@@ -94,6 +100,8 @@ class hqps_http_handler {
94100

95101
bool is_running() const;
96102

103+
bool is_actors_running() const;
104+
97105
seastar::future<> stop_query_actors();
98106

99107
void start_query_actors();

flex/engines/http_server/service/hqps_service.cc

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ ServiceConfig::ServiceConfig()
2424
dpdk_mode(false),
2525
enable_thread_resource_pool(true),
2626
external_thread_num(2),
27-
start_admin_service(true) {}
27+
start_admin_service(true),
28+
start_compiler(false) {}
2829

2930
const std::string HQPSService::DEFAULT_GRAPH_NAME = "modern_graph";
31+
const std::string HQPSService::DEFAULT_INTERACTIVE_HOME = "/opt/flex/";
32+
const std::string HQPSService::COMPILER_SERVER_CLASS_NAME =
33+
"com.alibaba.graphscope.GraphServer";
3034

3135
HQPSService& HQPSService::get() {
3236
static HQPSService instance;
@@ -48,12 +52,16 @@ void HQPSService::init(const ServiceConfig& config) {
4852
initialized_.store(true);
4953
service_config_ = config;
5054
gs::init_cpu_usage_watch();
55+
if (config.start_compiler) {
56+
start_compiler_subprocess();
57+
}
5158
}
5259

5360
HQPSService::~HQPSService() {
5461
if (actor_sys_) {
5562
actor_sys_->terminate();
5663
}
64+
stop_compiler_subprocess();
5765
}
5866

5967
const ServiceConfig& HQPSService::get_service_config() const {
@@ -111,7 +119,15 @@ void HQPSService::run_and_wait_for_exit() {
111119

112120
void HQPSService::set_exit_state() { running_.store(false); }
113121

122+
bool HQPSService::is_actors_running() const {
123+
if (query_hdl_) {
124+
return query_hdl_->is_actors_running();
125+
} else
126+
return false;
127+
}
128+
114129
seastar::future<> HQPSService::stop_query_actors() {
130+
std::unique_lock<std::mutex> lock(mtx_);
115131
if (query_hdl_) {
116132
return query_hdl_->stop_query_actors();
117133
} else {
@@ -122,11 +138,112 @@ seastar::future<> HQPSService::stop_query_actors() {
122138
}
123139

124140
void HQPSService::start_query_actors() {
141+
std::unique_lock<std::mutex> lock(mtx_);
125142
if (query_hdl_) {
126143
query_hdl_->start_query_actors();
127144
} else {
128145
std::cerr << "Query handler has not been inited!" << std::endl;
129146
return;
130147
}
131148
}
149+
150+
bool HQPSService::start_compiler_subprocess(
151+
const std::string& graph_schema_path) {
152+
LOG(INFO) << "Start compiler subprocess";
153+
stop_compiler_subprocess();
154+
auto java_bin_path = boost::process::search_path("java");
155+
if (java_bin_path.empty()) {
156+
std::cerr << "Java binary not found in PATH!" << std::endl;
157+
return false;
158+
}
159+
// try to find compiler jar from env.
160+
auto interactive_class_path = find_interactive_class_path();
161+
if (interactive_class_path.empty()) {
162+
std::cerr << "Interactive home not found!" << std::endl;
163+
return false;
164+
}
165+
std::stringstream ss;
166+
ss << "java -cp " << interactive_class_path;
167+
if (!graph_schema_path.empty()) {
168+
ss << " -Dgraph.schema=" << graph_schema_path;
169+
}
170+
ss << " " << COMPILER_SERVER_CLASS_NAME;
171+
ss << " " << service_config_.engine_config_path;
172+
auto cmd_str = ss.str();
173+
LOG(INFO) << "Start compiler with command: " << cmd_str;
174+
auto compiler_log = WorkDirManipulator::GetCompilerLogFile();
175+
176+
compiler_process_ =
177+
boost::process::child(cmd_str, boost::process::std_out > compiler_log,
178+
boost::process::std_err > compiler_log);
179+
LOG(INFO) << "Compiler process started with pid: " << compiler_process_.id();
180+
// sleep for a while to wait for the compiler to start
181+
std::this_thread::sleep_for(std::chrono::seconds(4));
182+
// check if the compiler process is still running
183+
if (!compiler_process_.running()) {
184+
LOG(ERROR) << "Compiler process failed to start!";
185+
return false;
186+
}
187+
return true;
188+
}
189+
190+
bool HQPSService::stop_compiler_subprocess() {
191+
if (compiler_process_.running()) {
192+
LOG(INFO) << "Terminate previous compiler process with pid: "
193+
<< compiler_process_.id();
194+
compiler_process_.terminate();
195+
}
196+
return true;
197+
}
198+
199+
std::string HQPSService::find_interactive_class_path() {
200+
std::string interactive_home = DEFAULT_INTERACTIVE_HOME;
201+
if (std::getenv("INTERACTIVE_HOME")) {
202+
// try to use DEFAULT_INTERACTIVE_HOME
203+
interactive_home = std::getenv("INTERACTIVE_HOME");
204+
}
205+
206+
// check compiler*.jar in DEFAULT_INTERACTIVE_HOME/lib/
207+
LOG(INFO) << "try to find compiler*.jar in " << interactive_home << "/lib/";
208+
auto lib_path = interactive_home + "/lib/";
209+
if (boost::filesystem::exists(lib_path)) {
210+
for (auto& p : boost::filesystem::directory_iterator(lib_path)) {
211+
if (p.path().filename().string().find("compiler") != std::string::npos &&
212+
p.path().extension() == ".jar") {
213+
return lib_path + "* -Djna.library.path=" + lib_path;
214+
}
215+
}
216+
}
217+
// if not, try the relative path from current binary's path
218+
auto current_binary_path = boost::filesystem::canonical("/proc/self/exe");
219+
auto current_binary_dir = current_binary_path.parent_path();
220+
auto ir_core_lib_path =
221+
current_binary_dir /
222+
"../../../interactive_engine/executor/ir/target/release/";
223+
if (!boost::filesystem::exists(ir_core_lib_path)) {
224+
LOG(ERROR) << "ir_core_lib_path not found";
225+
return "";
226+
}
227+
// compiler*.jar in
228+
// current_binary_dir/../../interactive_engine/compiler/target/
229+
auto compiler_path =
230+
current_binary_dir / "../../../interactive_engine/compiler/target/";
231+
LOG(INFO) << "try to find compiler*.jar in " << compiler_path;
232+
if (boost::filesystem::exists(compiler_path)) {
233+
for (auto& p : boost::filesystem::directory_iterator(compiler_path)) {
234+
if (p.path().filename().string().find("compiler") != std::string::npos &&
235+
p.path().extension() == ".jar") {
236+
auto libs_path = compiler_path / "libs";
237+
// combine the path with the libs folder
238+
if (boost::filesystem::exists(libs_path)) {
239+
return p.path().string() + ":" + libs_path.string() +
240+
"/* -Djna.library.path=" + ir_core_lib_path.string();
241+
}
242+
}
243+
}
244+
}
245+
LOG(ERROR) << "Compiler jar not found";
246+
return "";
247+
}
248+
132249
} // namespace server

0 commit comments

Comments
 (0)