-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathreplay.txt
131 lines (122 loc) · 4.8 KB
/
replay.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def dsTb(timeRS, startDate, endDate, stkList, replayName)
{
if(replayName == "snapshot"){
tab = loadTable("dfs://Test_snapshot", "snapshot")
}
else if(replayName == "order") {
tab = loadTable("dfs://Test_order", "order")
}
else if(replayName == "transaction") {
tab = loadTable("dfs://Test_transaction", "transaction")
}
else {
return NULL
}
ds = replayDS(sqlObj=<select * from tab where MDDate>=startDate and MDDate<endDate and HTSCSecurityID in stkList>, dateColumn='MDDate', timeColumn='MDTime', timeRepartitionSchema=timeRS)
return ds
}
def createEnd(tabName, sortColumn)
{
dbName = "dfs://End"
tbName = "endline"
if(not existsDatabase(dbName))
{
db = database(directory=dbName, partitionType=VALUE, partitionScheme=2023.04.03..2023.04.04)
endTb = table(2200.01.01T23:59:59.000 as DateTime, `END as point, long(0) as ApplSeqNum)
endLine = db.createPartitionedTable(table=endTb, tableName=tbName, partitionColumns=`DateTime)
endLine.append!(endTb)
}
ds = replayDS(sqlObj=<select * from loadTable(dbName, tbName)>, dateColumn='DateTime', timeColumn='DateTime')
inputEnd = dict(["end"], [ds])
dateEnd = dict(["end"], [`DateTime])
timeEnd = dict(["end"], [`DateTime])
if(sortColumn == "NULL")
{
replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1)
}
else
{
replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1, sortColumns=sortColumn)
}
}
def replayJob(inputDict, tabName, dateDict, timeDict, replayRate, sortColumn)
{
if(sortColumn == "NULL")
{
replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23)
}
else
{
replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23, sortColumns=sortColumn)
}
createEnd(tabName, sortColumn)
}
def stkReplay(stkList, mutable startDate, mutable endDate, replayRate, replayUuid, replayName)
{
maxCnt = 50
returnBody = dict(STRING, STRING)
startDate = datetimeParse(startDate, "yyyyMMdd")
endDate = datetimeParse(endDate, "yyyyMMdd") + 1
sortColumn = "ApplSeqNum"
if(stkList.size() > maxCnt)
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "超过单次回放股票上限,最大回放上限:" + string(maxCnt)
return returnBody
}
if(size(replayName) != 0)
{
for(name in replayName)
{
if(not name in ["snapshot", "order", "transaction"])
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "请输入正确的数据源名称,不能识别的数据源名称:" + name
return returnBody
}
}
}
else
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "缺少回放数据源,请输入正确的数据源名称"
return returnBody
}
try
{
if(size(replayName) == 1 && replayName[0] == "snapshot")
{
colName = ["timestamp", "biz_type", "biz_data"]
colType = [TIMESTAMP, SYMBOL, BLOB]
sortColumn = "NULL"
}
else
{
colName = ["timestamp", "biz_type", "biz_data", sortColumn]
colType = [TIMESTAMP, SYMBOL, BLOB, LONG]
}
msgTmp = streamTable(10000000:0, colName, colType)
tabName = "replay_" + replayUuid
enableTableShareAndPersistence(table=msgTmp, tableName=tabName, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=60, flushMode=0, preCache=1000000)
timeRS = cutPoints(09:30:00.000..15:00:00.000, 23)
inputDict = dict(replayName, each(dsTb{timeRS, startDate, endDate, stkList}, replayName))
dateDict = dict(replayName, take(`MDDate, replayName.size()))
timeDict = dict(replayName, take(`MDTime, replayName.size()))
jobId = "replay_" + replayUuid
jobDesc = "replay stock data"
submitJob(jobId, jobDesc, replayJob{inputDict, tabName, dateDict, timeDict, replayRate, sortColumn})
returnBody["errorCode"] = "1"
returnBody["errorMsg"] = "后台回放成功"
return returnBody
}
catch(ex)
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "回放行情数据异常,异常信息:" + ex
return returnBody
}
}
addFunctionView(dsTb)
addFunctionView(createEnd)
addFunctionView(replayJob)
addFunctionView(stkReplay)