-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathorigin.dos
34 lines (31 loc) · 1.76 KB
/
origin.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//2. 优化前代码
def toIntDate(d){
return year(d) * 10000 + monthOfYear(d) * 100 + dayOfMonth(d)
}
def genDataV1(date1, dateN){
tradeSrc = loadTable("dfs://originData", "trade")
tradeTgt = loadTable("dfs://formatData", "trade")
timer for (aDate in date1..dateN){
tradeSecurityID = (exec distinct(securityID) from tradeSrc where tradingdate = aDate).shuffle()
for (m in tradeSecurityID){
tradingdf = select * from tradeSrc where securityID = m and tradingdate = aDate
tradingdf["symbol"] = m + "SZ"
print("stock " + m + ",date is " + aDate + ",tradingdf size " + tradingdf.size())
tradingdf["buysellflag"] =iif(tradingdf["sellorderid"] > tradingdf["buyorderid"],"S", "B")
tradingdf["tradeamount"] = tradingdf["tradevolume"] * tradingdf["tradeprice"]
tradingdf = tradingdf[(tradingdf["tradetype"] == "0") || (tradingdf["tradetype"] == "F")]
tradingdf = select symbol,tradingdate, tradingtime, recid, tradeprice, tradevolume, tradeamount, buyorderid, sellorderid, buysellflag, unix from tradingdf
tradingdf = select * from tradingdf order by symbol, tradingtime, recid
tradingdf.replaceColumn!("tradingdate", toIntDate(::date(tradingdf.tradingDate)))
tradingtime = string(exec tradingtime from tradingdf)
tradingdf.replaceColumn!(`tradingtime, tradingtime)
unix = long(exec unix from tradingdf)
tradingdf.replaceColumn!(`unix, unix)
tradeTgt.append!(tradingdf)
}
}
}
jobId = "originTranfer"
jobdesc = "origin method for etl data"
submitJob(jobId, jobdesc, genDataV1, 2022.05.01, 2022.05.20)
getRecentJobs()