Skip to content

Commit

Permalink
FlowLauncher
Browse files Browse the repository at this point in the history
  • Loading branch information
ygang committed Oct 31, 2022
1 parent 074aedf commit 8c8e336
Showing 1 changed file with 38 additions and 8 deletions.
46 changes: 38 additions & 8 deletions piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,50 @@ import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.SparkFiles
import org.apache.spark.launcher.SparkLauncher
import com.alibaba.fastjson.{JSON, JSONObject}

import scala.collection.mutable



/**
* Created by [email protected] on 4/30/19
*/
* Created by [email protected] on 4/30/19
*/
object FlowLauncher {

def launch(flow: Flow) : SparkLauncher = {

var flowJson = flow.getFlowJson()
println("FlowLauncher json:" + flowJson)

val flowObject: JSONObject = JSON.parseObject(flowJson)

val stopsJsonArray = flowObject.getJSONObject("flow").getJSONArray("stops")

val dockerExecutor = new StringBuilder()
for (i<- 0 until stopsJsonArray.size()){
if(stopsJsonArray.getJSONObject(i).getJSONObject("properties").containsKey("ymlPath")) {
val ymlPath = stopsJsonArray.getJSONObject(i).getJSONObject("properties").getOrDefault("ymlPath", "").toString
val unzipDir= ymlPath.substring(ymlPath.lastIndexOf("/")+1).replace(".zip","")
dockerExecutor.append(ymlPath+"#"+unzipDir)
dockerExecutor.append(",")
}
if(stopsJsonArray.getJSONObject(i).getJSONObject("properties").containsKey("zipPath")) {
val zipPath = stopsJsonArray.getJSONObject(i).getJSONObject("properties").getOrDefault("zipPath", "").toString
val unzipDir= zipPath.substring(zipPath.lastIndexOf("/")+1).replace(".zip","")
dockerExecutor.append(zipPath+"#app/"+unzipDir)
dockerExecutor.append(",")
}
}

println(dockerExecutor)

var distArchives= ""
if(dockerExecutor.length >1){
distArchives = dockerExecutor.toString().stripPrefix(",")
}


val flowFileName = flow.getFlowName() + new Date().getTime
val flowFile = FlowFileUtil.getFlowFilePath(flowFileName)
FileUtil.writeFile(flowJson, flowFile)
Expand All @@ -30,8 +63,6 @@ object FlowLauncher {
var appId : String = ""
val countDownLatch = new CountDownLatch(1)
val launcher = new SparkLauncher
val pyspark= PropertyUtil.getPropertyValue("fs.defaultFS") + "/piflow/pythonEnv/pyspark_venv.zip#pyspark"
val pythonExecutor= PropertyUtil.getPropertyValue("fs.defaultFS") + "/piflow/pythonExecutor.zip#pythonExecutor"

val sparkLauncher =launcher
.setAppName(flow.getFlowName())
Expand All @@ -43,13 +74,12 @@ object FlowLauncher {
.setConf("spark.executor.instances", flow.getExecutorNum())
.setConf("spark.executor.memory", flow.getExecutorMem())
.setConf("spark.executor.cores",flow.getExecutorCores())
.setConf("spark.driver.allowMultipleContexts","true")
.setConf("spark.driver.allowMultipleContexts","true")
.setConf("spark.pyspark.python","pyspark/venv/bin/python3")
// .setConf("spark.driver.allowMultipleContexts","true")
// .setConf("spark.pyspark.python","pyspark/venv/bin/python3")
.addFile(PropertyUtil.getConfigureFile())
.addFile(ServerIpUtil.getServerIpFile())
.addFile(flowFile)
.setConf("spark.yarn.dist.archives",pyspark+","+pythonExecutor)
.setConf("spark.yarn.dist.archives",distArchives)
.setMainClass("cn.piflow.api.StartFlowMain")
.addAppArgs(flowFileName)

Expand Down

0 comments on commit 8c8e336

Please sign in to comment.