Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support spark standalone deploy mode #4131

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ object ConfigKeys {

val KEY_SPARK_YARN_EXECUTOR_NODE_LABEL = "spark.yarn.executor.nodeLabelExpression"

val MASTER_URl = "master"

val MASTER_WEB_URl = "master.web"

def KEY_SPARK_SQL(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}sql"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.controller;

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.SparkCluster;
import org.apache.streampark.console.core.service.SparkClusterService;
import org.apache.streampark.console.core.util.ServiceHelper;

import org.apache.shiro.authz.annotation.RequiresPermissions;

import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@Slf4j
@Validated
@RestController
@RequestMapping("spark/cluster")
public class SparkClusterController {

@Autowired
private SparkClusterService sparkClusterService;

@PostMapping("page")
monrg marked this conversation as resolved.
Show resolved Hide resolved
public RestResponse findPage(SparkCluster sparkCluster, RestRequest restRequest) {
IPage<SparkCluster> sparkClusters = sparkClusterService.findPage(sparkCluster, restRequest);
return RestResponse.success(sparkClusters);
}

@PostMapping("alive")
monrg marked this conversation as resolved.
Show resolved Hide resolved
public RestResponse listAvailableCluster() {
List<SparkCluster> sparkClusters = sparkClusterService.listAvailableCluster();
return RestResponse.success(sparkClusters);
}

@PostMapping("list")
monrg marked this conversation as resolved.
Show resolved Hide resolved
public RestResponse list() {
List<SparkCluster> sparkClusters = sparkClusterService.list();
return RestResponse.success(sparkClusters);
}

@PostMapping("remote_url")
monrg marked this conversation as resolved.
Show resolved Hide resolved
public RestResponse remoteUrl(Long id) {
SparkCluster cluster = sparkClusterService.getById(id);
return RestResponse.success(cluster.getMasterUrl());
}

@PostMapping("check")
public RestResponse check(SparkCluster cluster) {
ResponseResult checkResult = sparkClusterService.check(cluster);
return RestResponse.success(checkResult);
}

@PostMapping("create")
@RequiresPermissions("cluster:create")
public RestResponse create(SparkCluster cluster) {
Long userId = ServiceHelper.getUserId();
Boolean success = sparkClusterService.create(cluster, userId);
return RestResponse.success(success);
}

@PostMapping("update")
@RequiresPermissions("cluster:update")
public RestResponse update(SparkCluster cluster) {
sparkClusterService.update(cluster);
return RestResponse.success();
}

@PostMapping("get")
public RestResponse get(Long id) throws InternalException {
SparkCluster cluster = sparkClusterService.getById(id);
return RestResponse.success(cluster);
}

@PostMapping("start")
public RestResponse start(SparkCluster cluster) {
sparkClusterService.updateClusterState(cluster.getId(), ClusterState.STARTING);
sparkClusterService.start(cluster);
return RestResponse.success();
}

@PostMapping("shutdown")
public RestResponse shutdown(SparkCluster cluster) {
if (sparkClusterService.allowShutdownCluster(cluster)) {
sparkClusterService.updateClusterState(cluster.getId(), ClusterState.CANCELLING);
sparkClusterService.shutdown(cluster);
}
return RestResponse.success();
}

@PostMapping("delete")
public RestResponse delete(SparkCluster cluster) {
sparkClusterService.remove(cluster.getId());
return RestResponse.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,38 @@ public class SparkApplication extends BaseEntity {

private Long teamId;

/** 1) spark jar 2) spark SQL 3) pyspark*/
/**
* 1) spark jar 2) spark SQL 3) pyspark
*/
private Integer jobType;

/** 1) Apache Spark 2) StreamPark Spark */
/**
* 1) Apache Spark 2) StreamPark Spark
*/
private Integer appType;

/** spark version */
/**
* spark version
*/
private Long versionId;

/** spark.app.name */
/**
* spark.app.name
*/
private String appName;

private Integer deployMode;

/** 1: cicd (build from csv) 2: upload (upload local jar job) */
/**
* 1: cicd (build from csv) 2: upload (upload local jar job)
*/
private Integer resourceFrom;

private Long projectId;

/** application module */
/**
* application module
*/
private String module;

private String mainClass;
Expand All @@ -104,7 +116,9 @@ public class SparkApplication extends BaseEntity {
*/
private String appProperties;

/** Arguments passed to the main method of your main class */
/**
* Arguments passed to the main method of your main class
*/
private String appArgs;

/**
Expand All @@ -124,29 +138,43 @@ public class SparkApplication extends BaseEntity {
*/
private transient String yarnQueueLabel;

/** The api server url of k8s. */
/**
* The api server url of k8s.
*/
private String k8sMasterUrl;

/** spark docker base image */
/**
* spark docker base image
*/
private String k8sContainerImage;

/** k8s image pull policy */
/**
* k8s image pull policy
*/
private int k8sImagePullPolicy;

/** k8s spark service account */
/**
* k8s spark service account
*/
private String k8sServiceAccount;

/** k8s namespace */
/**
* k8s namespace
*/
private String k8sNamespace = Constants.DEFAULT;

@TableField("HADOOP_USER")
private String hadoopUser;

/** max restart retries after job failed */
/**
* max restart retries after job failed
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer restartSize;

/** has restart count */
/**
* has restart count
*/
private Integer restartCount;

private Integer state;
Expand All @@ -161,17 +189,25 @@ public class SparkApplication extends BaseEntity {

private String description;

/** determine if tracking status */
/**
* determine if tracking status
*/
private Integer tracking;

/** task release status */
/**
* task release status
*/
@TableField("`release`")
private Integer release;

/** determine if a task needs to be built */
/**
* determine if a task needs to be built
*/
private Boolean build;

/** alert id */
/**
* alert id
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long alertId;

Expand All @@ -188,21 +224,30 @@ public class SparkApplication extends BaseEntity {

private String tags;

/** scheduling */
/**
* scheduling
*/
private String driverCores;
private String driverMemory;
private String executorCores;
private String executorMemory;
private String executorMaxNums;

/** metrics of running job */
/**
* metrics of running job
*/
private Long numTasks;
private Long numCompletedTasks;
private Long numStages;
private Long numCompletedStages;
private Long usedMemory;
private Long usedVCores;

/**
* the cluster id bound to the task in remote mode
*/
private Long sparkClusterId;

private transient String teamResource;
private transient String dependency;
private transient Long sqlId;
Expand All @@ -217,10 +262,14 @@ public class SparkApplication extends BaseEntity {
private transient Integer format;
private transient String backUpDescription;

/** spark Web UI Url */
/**
* spark Web UI Url
*/
private transient String sparkRestUrl;

/** refer to {@link org.apache.streampark.flink.packer.pipeline.BuildPipeline} */
/**
* refer to {@link org.apache.streampark.flink.packer.pipeline.BuildPipeline}
*/
private transient Integer buildStatus;

private transient AppControl appControl;
Expand Down Expand Up @@ -253,7 +302,7 @@ public void resolveYarnQueue() {
* 1) if dynamic allocation is disabled, it depends on "spark.executor.instances".
* 2) if dynamic allocation is enabled and "spark.dynamicAllocation.maxExecutors" is set, it depends on it.
* 3) if dynamic allocation is enabled and "spark.dynamicAllocation.maxExecutors" is not set,
* the number of executors can up to infinity.
* the number of executors can up to infinity.
*
* @param map The configuration map integrated with default configurations,
* configuration template and custom configurations.
Expand Down Expand Up @@ -328,7 +377,9 @@ public SparkDeployMode getDeployModeEnum() {
return SparkDeployMode.of(deployMode);
}

/** Local compilation and packaging working directory */
/**
* Local compilation and packaging working directory
*/
@JsonIgnore
public String getDistHome() {
String path = String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), projectId.toString(), getModule());
Expand All @@ -350,7 +401,9 @@ public String getRemoteAppHome() {
return path;
}

/** Automatically identify remoteAppHome or localAppHome based on app SparkDeployMode */
/**
* Automatically identify remoteAppHome or localAppHome based on app SparkDeployMode
*/
@JsonIgnore
public String getAppHome() {
switch (this.getDeployModeEnum()) {
Expand Down Expand Up @@ -467,6 +520,7 @@ public static StorageType getStorageType(Integer deployMode) {
case YARN_CLIENT:
return StorageType.HDFS;
case REMOTE:
case LOCAL:
return StorageType.LFS;
default:
throw new UnsupportedOperationException("Unsupported ".concat(deployModeEnum.getName()));
Expand Down
Loading
Loading