Skip to content

Commit 5bb7d64

Browse files
committed
fix bug: Execute Python Stop
1 parent d2d0bdf commit 5bb7d64

File tree

4 files changed

+45
-30
lines changed

4 files changed

+45
-30
lines changed

piflow-bundle/src/main/resources/flow/script/pythonWithDataFrame.json

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,29 @@
55
"stops":[
66
{
77
"uuid":"1111",
8-
"name":"CsvParser",
9-
"bundle":"cn.piflow.bundle.csv.CsvParser",
8+
"name":"MysqlRead",
9+
"bundle":"cn.piflow.bundle.jdbc.MysqlRead",
1010
"properties":{
11-
"csvPath":"hdfs://10.0.88.13:9000/xjzhu/test.csv",
12-
"header":"false",
13-
"delimiter":",",
14-
"schema":"title,author"
11+
"sql":"select id,name from student",
12+
"url":"jdbc:mysql://10.0.88.24:3306/visualization?useUnicode=true&characterEncoding=utf-8",
13+
"driver":"com.mysql.jdbc.Driver",
14+
"user":"root",
15+
"password":"123456"
1516
}
1617
},
1718
{
1819
"uuid":"2222",
1920
"name":"ExecutePythonWithDataFrame",
2021
"bundle":"cn.piflow.bundle.script.ExecutePythonWithDataFrame",
2122
"properties":{
22-
"script":"import sys\nimport os\n\nimport numpy as np\nfrom scipy import linalg\nimport pandas as pd\n\nimport matplotlib\nmatplotlib.use('Agg')\n\n\ndef listFunction(dictInfo):\n\n return dictInfo",
23+
"script":"import sys\nimport os\nimport numpy as np\ndef listFunction(dictInfo): \n newDict = {\"name\":\"hello new user!\", \"id\":11}\n secondDict = {\"name\":\"hello second user!\", \"id\":12}\n listInfo=[newDict, secondDict]\n return dictInfo + listInfo\n",
2324
"execFunction": "listFunction"
2425
}
2526
}
2627
],
2728
"paths":[
2829
{
29-
"from":"CsvParser",
30+
"from":"MysqlRead",
3031
"outport":"",
3132
"inport":"",
3233
"to":"ExecutePythonWithDataFrame"

piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecutePythonWithDataFrame.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,24 +75,35 @@ class ExecutePythonWithDataFrame extends ConfigurableStop{
7575
val listInfo = df.toJSON.collectAsList()
7676
jep.eval(s"result = $execFunction($listInfo)")
7777
val resultArrayList = jep.getValue("result",new util.ArrayList().getClass)
78-
println(resultArrayList)
78+
println("Execute Python Result : " + resultArrayList + "!!!!!!!!!!!!!!!!!!!!!")
7979

8080

81-
var resultList = List[Map[String, Any]]()
81+
var resultList = List[Map[String, String]]()
8282
val it = resultArrayList.iterator()
8383
while(it.hasNext){
8484
val i = it.next().asInstanceOf[java.util.HashMap[String, Any]]
8585
val item = mapAsScalaMap(i).toMap[String, Any]
86-
resultList = item +: resultList
86+
var new_item = Map[String, String]()
87+
item.foreach(m => {
88+
val key = m._1
89+
val value = m._2
90+
new_item += (key -> String.valueOf(value))
91+
})
92+
resultList = new_item +: resultList
8793
}
94+
println("Convert Python Result to Scala List: " + resultList + "!!!!!!!!!!!!!!!!!!!!!")
8895

8996

9097
val rows = resultList.map( m => Row(m.values.toSeq:_*))
98+
//println("rows: " + rows + "!!!!!!!!!!!!!!!!!!!!!")
9199
val header = resultList.head.keys.toList
100+
//println("header: " + header + "!!!!!!!!!!!!!!!!!!!!!")
92101
val schema = StructType(header.map(fieldName => new StructField(fieldName, StringType, true)))
102+
println("schema: " + schema + "!!!!!!!!!!!!!!!!!!!!!")
93103

94104
val rdd = spark.sparkContext.parallelize(rows)
95105
val resultDF = spark.createDataFrame(rdd, schema)
106+
resultDF.show()
96107

97108
out.write(resultDF)
98109
}

piflow-bundle/src/main/scala/cn/piflow/bundle/script/ExecuteScala.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class ExecuteScala extends ConfigurableStop{
3434
val pluginName = new PropertyDescriptor()
3535
.name("plugin")
3636
.displayName("Plugin")
37-
.description("The class name of scala code. This field is generated automaticly.")
37+
.description("The class name of scala code.")
3838
.defaultValue("")
3939
.required(true)
4040
descriptor = pluginName :: descriptor

piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import org.flywaydb.core.api.FlywayException
3131
import org.h2.tools.Server
3232
import spray.json.DefaultJsonProtocol
3333

34+
import scala.io.Source
35+
3436

3537
object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSupport{
3638
implicit val config = ConfigFactory.load()
@@ -295,28 +297,28 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
295297

296298
case HttpRequest(POST, Uri.Path("/group/start"), headers, entity, protocol) =>{
297299

300+
/*try{
301+
302+
val bodyFeature = Unmarshal(entity.withoutSizeLimit())
303+
val flowGroupJson = ""//Await.result(bodyFeature,scala.concurrent.duration.Duration(5,"second"))
304+
305+
val flowGroupExecution = API.startGroup(flowGroupJson)
306+
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
307+
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
308+
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
309+
}catch {
310+
case ex => {
311+
println(ex)
312+
Future.successful(HttpResponse(FAIL_CODE, entity = "Can not start group!"))
313+
}
314+
}*/
298315
try{
299-
/*entity match {
300-
case HttpEntity.Strict(_, data) =>{
301-
var flowGroupJson = data.utf8String
302-
val flowGroupExecution = API.startGroup(flowGroupJson)
303-
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
304-
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
305-
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
306-
}
307-
case otherType => {
308-
println(otherType)
309316

310-
val bodyFeature = Unmarshal(entity).to [String]
311-
val flowGroupJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
312-
val flowGroupExecution = API.startGroup(flowGroupJson)
313-
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
314-
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
315-
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
316-
}
317-
}*/
318317
val bodyFeature = Unmarshal(entity).to [String]
319318
val flowGroupJson = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
319+
//use file to run large group
320+
//val flowGroupJsonPath = Await.result(bodyFeature,scala.concurrent.duration.Duration(1,"second"))
321+
//val flowGroupJson = Source.fromFile(flowGroupJsonPath).getLines().toArray.mkString("\n")*/
320322
val flowGroupExecution = API.startGroup(flowGroupJson)
321323
flowGroupMap += (flowGroupExecution.getGroupId() -> flowGroupExecution)
322324
val result = "{\"group\":{\"id\":\"" + flowGroupExecution.getGroupId() + "\"}}"
@@ -328,6 +330,7 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
328330
}
329331
}
330332

333+
331334
}
332335

333336

0 commit comments

Comments
 (0)